Common client operation

All clients (Source, Sink, HistoryClient, etc.) inherit from the common base class below. Use its methods to run or stop a client, or connect to the MetricQ network if you derive a custom client.

class metricq.Client(*args, client_version=None, **kwargs)
await connect()

Connect to the MetricQ network

Return type:

None

await get_metrics(selector=None, metadata=True, historic=None, timeout=None, prefix=None, infix=None, limit=None, hidden=None)

Retrieve information for metrics matching a selector pattern.

Parameters:
selector : str | Sequence[str] | NoneUnion[str, Sequence[str], None] (default: None)

Either:

  • a regex matching parts of the metric name

  • a sequence of metric names

historic : bool | NoneOptional[bool] (default: None)

Only include metrics with the historic flag set.

metadata : bool (default: True)

If true, include metric metadata in the response.

timeout : float | NoneOptional[float] (default: None)

Operation timeout in seconds.

prefix : str | NoneOptional[str] (default: None)

Filter results by prefix on the key.

infix : str | NoneOptional[str] (default: None)

Filter results by infix on the key.

limit : int | NoneOptional[int] (default: None)

Maximum number of matches to return.

hidden : bool | NoneOptional[bool] (default: None)

Only include metrics where hidden is True/False. If not set return all matching metrics.

Return type:

dict[str, dict[str, Any]]

Returns:

  • a dictionary mapping matching metric names to their metadata ( if metadata==True)

  • otherwise, a dictionary mapping metric names to empty dicts

on_signal(signal)

Callback invoked when a signal is received.

Override this method for custom signal handling. By default, it schedules the Client to stop by calling stop().

Parameters:
signal : str

Name of the signal that occurred, e.g. "SIGTERM", "SIGINT", etc.

Return type:

None

await rpc(function, *args, **kwargs)

Invoke an RPC on the management exchange

Parameters:
function : str

Name of the RPC to invoke

kwargs : Any

Additional arguments are forwarded to Agent.rpc().

exchange, routing_key, and cleanup_on_response are not allowed in kwargs.

Note

Argument names are required to be in "javaScriptSnakeCase".

Raises:
Return type:

Optional[dict[str, Any]]

run(catch_signals=('SIGINT', 'SIGTERM'), cancel_on_exception=False, use_uvloop=None)

Run an Agent by calling connect() and waiting for it to be stopped via stop().

If connect() raises an exception, ConnectFailed is raised, with the offending exception attached as a cause. Any exception passed to stop() is reraised.

Parameters:
catch_signals : Iterable[str] (default: ('SIGINT', 'SIGTERM'))

Call on_signal() if any of these signals were raised.

cancel_on_exception : bool (default: False)

Stop the running Agent when an unhandled exception occurs. The exception is reraised from this method.

use_uvloop : bool | NoneOptional[bool] (default: None)

Use uvloop as the asyncio event loop. If None, uvloop is used if available. If True, uvloop is used and an ImportError is raised if it is not available. If False, uvloop is not used even if it is available.

Raises:
Return type:

None

await stop(exception=None, silent=False)

Stop a running Agent. When calling stop multiple times, all but the first call will be ignored. It will inform anyone waiting for stopped() about the completion or the given exception.

Parameters:
exception : BaseException | NoneOptional[BaseException] (default: None)

An optional exception that will be raised by run() if given. If the Agent was not started from run(), see stopped() how to retrieve this exception.

silent : bool (default: False)

If set to True, a passed exception will not be raised.

Raises:

AgentStopped – If an exception is given or an exception occurred while closing the connection(s) and silent==False.

Return type:

None

await stopped()

Wait for this Agent to stop.

If the agent stopped unexpectedly, this method raises an exception.

Raises:
  • AgentStopped – The Agent was stopped via stop() and an exception was passed.

  • Exception – The Agent encountered any other unhandled exception.

Return type:

None

await teardown()

Important

Do not call this function, it is called indirectly by Agent.stop().

Close all connections and channels. Child classes should implement this method to close their own connections and channels and call super().teardown(). :rtype: None

property url: str

The management URL with the password replaced by ‘**’, if user present.


Sink and Source implementations further inherit from DataClient.

class metricq.DataClient(*args, **kwargs)
await data_config(dataServerAddress, **kwargs)

This method is a registered RPC handler, do not call this in child classes.

Return type:

None

await teardown()

Important

Do not call this function, it is called indirectly by Agent.stop().

Closes the data connection and the data channel in addition to Agent.teardown(). :rtype: None


The Agent base class is not used directly, but referenced in the documentation of other classes.

class metricq.Agent(token, url=None, *, connection_timeout=600, add_uuid=False, management_url=None)

Base class for all MetricQ agents - i.e. clients that are connected to the RabbitMQ MetricQ network.

The lifetime of an agent works as follows:

stop() is automatically invoked if the connection is lost and the reconnect timeout is exceeded. It may also be called manually to stop the agent.

To indefinitely wait for the agent to stop, use stopped().

Usually, connect() and stop() are not called directly, but instead:

  • The class (via its child Client) is used as an asynchronous context manager, e.g. async with Client(...) as agent:. In that case, connect() and stop() are called as part of the context.

  • The synchronous run() method is called, which:

    • sets up a signal handler for SIGINT and SIGTERM that calls stop().

    • sets up a loop exception handler that calls stop().

    • calls connect() and stopped() to run indefinitely until stop() is called.

Within stop(), the Agent will invoke teardown() to allow the all child classes to perform any necessary cleanup. Implementations of teardown() should call super().teardown(), possibly in an asyncio.gather. stop() wraps the invocation of teardown() in a timeout (_close_timeout) and logs any errors during cleanup, possibly passing it to anyone waiting for stopped().

LOG_MAX_WIDTH = 200
_abc_impl = <_abc._abc_data object>
property _event_loop: AbstractEventLoop
_make_correlation_id()
Return type:

str

_on_close(sender, exception)
Return type:

None

_on_management_connection_close(sender, _exception)
Return type:

None

_on_management_connection_reconnect(sender)
Return type:

None

await _on_management_message(message)

Callback invoked when a message is received.

Parameters:
message : AbstractIncomingMessage

Either an RPC request or an RPC response.

Raises:

PublishFailed – The reply could not be published.

Return type:

None

_on_reconnect(sender)
Return type:

None

_rpc_handlers: defaultdict[str, list[RPCHandlerType]] = {}
_schedule_stop(exception=None)
Return type:

None

property _stop_future: Future[None]
await _wait_for_stop(catch_signals)
Return type:

None

await connect()

Connect to the MetricQ network

Return type:

None

derive_address(address)

Add the credentials from the management connection to the provided address

Return type:

str

await make_connection(url, connection_name)
Return type:

AbstractRobustConnection

on_exception(loop, context)
Return type:

None

on_signal(signal)

Callback invoked when a signal is received.

Override this method for custom signal handling. By default, it schedules the Client to stop by calling stop().

Parameters:
signal : str

Name of the signal that occurred, e.g. "SIGTERM", "SIGINT", etc.

Return type:

None

await rpc(exchange, routing_key, function, response_callback=None, timeout=60, cleanup_on_response=True, **kwargs)

Invoke an RPC over the network.

Warning

This function is not part of the public API, but is included for reference. Use Client.rpc() instead.

Parameters:
function : str

Name of the RPC to invoke

exchange : AbstractExchange

RabbitMQ exchange on which the request is published

routing_key : str

Routing key must be at most 255 bytes (UTF-8)

response_callback : (…) → None | NoneOptional[Callable[..., None]] (default: None)

If given, this callable will be invoked with any response once it arrives. In this case, this function immediately returns None.

If omitted (or None), this function will wait for and return the first response instead.

timeout : float (default: 60)

After the timeout, a response will not be dispatched to the handler.

cleanup_on_response : bool (default: True)

If set, only the first response will be dispatched. Must be True when no response_callback is given.

kwargs : Any

Any additional arguments that are forwarded as arguments to the RPC itself.

Note

Argument names are required to be in "javaScriptSnakeCase".

Return type:

Optional[dict[str, Any]]

Returns:

None if response_callback is given, otherwise a dict containing the RPC response.

Raises:
  • PublishFailed – Failed to publish this RPC to the network.

  • RPCError – The remote returned an error.

  • TypeError – The function keyword-only argument is missing.

  • TypeErrorresponse_callback is None but cleanup_on_response=True

  • ValueError – The routing key is longer than 255 bytes

await rpc_consume(extra_queues=[])

Start consuming RPCs

Typically, this is called at the end of Client.connect() once the Agent is prepared to handle RPCs.

Parameters:
extra_queues : Iterable[Queue] (default: [])

additional queues on which to receive RPCs

Return type:

None

run(catch_signals=('SIGINT', 'SIGTERM'), cancel_on_exception=False, use_uvloop=None)

Run an Agent by calling connect() and waiting for it to be stopped via stop().

If connect() raises an exception, ConnectFailed is raised, with the offending exception attached as a cause. Any exception passed to stop() is reraised.

Parameters:
catch_signals : Iterable[str] (default: ('SIGINT', 'SIGTERM'))

Call on_signal() if any of these signals were raised.

cancel_on_exception : bool (default: False)

Stop the running Agent when an unhandled exception occurs. The exception is reraised from this method.

use_uvloop : bool | NoneOptional[bool] (default: None)

Use uvloop as the asyncio event loop. If None, uvloop is used if available. If True, uvloop is used and an ImportError is raised if it is not available. If False, uvloop is not used even if it is available.

Raises:
Return type:

None

await stop(exception=None, silent=False)

Stop a running Agent. When calling stop multiple times, all but the first call will be ignored. It will inform anyone waiting for stopped() about the completion or the given exception.

Parameters:
exception : BaseException | NoneOptional[BaseException] (default: None)

An optional exception that will be raised by run() if given. If the Agent was not started from run(), see stopped() how to retrieve this exception.

silent : bool (default: False)

If set to True, a passed exception will not be raised.

Raises:

AgentStopped – If an exception is given or an exception occurred while closing the connection(s) and silent==False.

Return type:

None

await stopped()

Wait for this Agent to stop.

If the agent stopped unexpectedly, this method raises an exception.

Raises:
  • AgentStopped – The Agent was stopped via stop() and an exception was passed.

  • Exception – The Agent encountered any other unhandled exception.

Return type:

None

await teardown()

Important

Do not call this function, it is called indirectly by Agent.stop().

Close all connections and channels. Child classes should implement this method to close their own connections and channels and call super().teardown(). :rtype: None

property url: str

The management URL with the password replaced by ‘**’, if user present.