Common client operation
All clients (Source
, Sink
, HistoryClient
, etc.) inherit from the common base class below.
Use its methods to run
or stop
a client,
or connect
to the MetricQ network if you derive a custom client.
- class metricq.Client(*args, client_version=None, **kwargs)
-
- await get_metrics(selector=None, metadata=True, historic=None, timeout=None, prefix=None, infix=None, limit=None, hidden=None)
Retrieve information for metrics matching a selector pattern.
- Parameters:
- selector :
str
|Sequence
[str
] |None
Union
[str
,Sequence
[str
],None
] (default:None
) Either:
a regex matching parts of the metric name
a sequence of metric names
- historic :
bool
|None
Optional
[bool
] (default:None
) Only include metrics with the
historic
flag set.- metadata :
bool
(default:True
) If true, include metric metadata in the response.
- timeout :
float
|None
Optional
[float
] (default:None
) Operation timeout in seconds.
- prefix :
str
|None
Optional
[str
] (default:None
) Filter results by prefix on the key.
- infix :
str
|None
Optional
[str
] (default:None
) Filter results by infix on the key.
- limit :
int
|None
Optional
[int
] (default:None
) Maximum number of matches to return.
- hidden :
bool
|None
Optional
[bool
] (default:None
) Only include metrics where
hidden
isTrue
/False
. If not set return all matching metrics.
- selector :
- Return type:
- Returns:
a dictionary mapping matching metric names to their metadata (
if metadata==True
)otherwise, a dictionary mapping metric names to empty dicts
- on_signal(signal)
Callback invoked when a signal is received.
Override this method for custom signal handling. By default, it schedules the Client to stop by calling
stop()
.
- await rpc(function, *args, **kwargs)
Invoke an RPC on the management exchange
- Parameters:
- function :
str
Name of the RPC to invoke
- kwargs :
Any
Additional arguments are forwarded to
Agent.rpc()
.exchange
,routing_key
, andcleanup_on_response
are not allowed inkwargs
.Note
Argument names are required to be in
"javaScriptSnakeCase"
.
- function :
- Raises:
PublishFailed – if the RPC could not be published
RPCError – if the remote returns an error
- Return type:
- run(catch_signals=('SIGINT', 'SIGTERM'), cancel_on_exception=False, use_uvloop=None)
Run an Agent by calling
connect()
and waiting for it to be stopped viastop()
.If
connect()
raises an exception,ConnectFailed
is raised, with the offending exception attached as a cause. Any exception passed tostop()
is reraised.- Parameters:
- catch_signals :
Iterable
[str
] (default:('SIGINT', 'SIGTERM')
) Call
on_signal()
if any of these signals were raised.- cancel_on_exception :
bool
(default:False
) Stop the running Agent when an unhandled exception occurs. The exception is reraised from this method.
- use_uvloop :
bool
|None
Optional
[bool
] (default:None
) Use uvloop as the asyncio event loop. If
None
, uvloop is used if available. IfTrue
, uvloop is used and an ImportError is raised if it is not available. IfFalse
, uvloop is not used even if it is available.
- catch_signals :
- Raises:
ConnectFailed – Failed to
connect()
to the MetricQ network. The source exception is attached as a cause.
- Return type:
- await stop(exception=None, silent=False)
Stop a running Agent. When calling stop multiple times, all but the first call will be ignored. It will inform anyone waiting for
stopped()
about the completion or the given exception.- Parameters:
- exception :
BaseException
|None
Optional
[BaseException
] (default:None
) An optional exception that will be raised by
run()
if given. If the Agent was not started fromrun()
, seestopped()
how to retrieve this exception.- silent :
bool
(default:False
) If set to
True
, a passed exception will not be raised.
- exception :
- Raises:
AgentStopped – If an
exception
is given or an exception occurred while closing the connection(s) andsilent==False
.- Return type:
- await stopped()
Wait for this Agent to stop.
If the agent stopped unexpectedly, this method raises an exception.
- Raises:
AgentStopped – The Agent was stopped via
stop()
and an exception was passed.Exception – The Agent encountered any other unhandled exception.
- Return type:
- await teardown()
Important
Do not call this function, it is called indirectly by
Agent.stop()
.Close all connections and channels. Child classes should implement this method to close their own connections and channels and call
super().teardown()
. :rtype:None
Sink
and Source
implementations further inherit from DataClient
.
- class metricq.DataClient(*args, **kwargs)
- await data_config(dataServerAddress, **kwargs)
This method is a registered RPC handler, do not call this in child classes.
- Return type:
- await teardown()
Important
Do not call this function, it is called indirectly by
Agent.stop()
.Closes the data connection and the data channel in addition to
Agent.teardown()
. :rtype:None
The Agent base class is not used directly, but referenced in the documentation of other classes.
- class metricq.Agent(token, url=None, *, connection_timeout=600, add_uuid=False, management_url=None)
Base class for all MetricQ agents - i.e. clients that are connected to the RabbitMQ MetricQ network.
The lifetime of an agent works as follows:
stop()
is automatically invoked if the connection is lost and the reconnect timeout is exceeded. It may also be called manually to stop the agent.To indefinitely wait for the agent to stop, use
stopped()
.Usually,
connect()
andstop()
are not called directly, but instead:The class (via its child
Client
) is used as an asynchronous context manager, e.g.async with Client(...) as agent:
. In that case,connect()
andstop()
are called as part of the context.The synchronous
run()
method is called, which:
Within
stop()
, the Agent will invoketeardown()
to allow the all child classes to perform any necessary cleanup. Implementations ofteardown()
should callsuper().teardown()
, possibly in anasyncio.gather
.stop()
wraps the invocation ofteardown()
in a timeout (_close_timeout
) and logs any errors during cleanup, possibly passing it to anyone waiting forstopped()
.- LOG_MAX_WIDTH = 200
- _abc_impl = <_abc._abc_data object>
- property _event_loop: AbstractEventLoop
- await _on_management_message(message)
Callback invoked when a message is received.
- Parameters:
- message :
AbstractIncomingMessage
Either an RPC request or an RPC response.
- message :
- Raises:
PublishFailed – The reply could not be published.
- Return type:
- _rpc_handlers: defaultdict[str, list[RPCHandlerType]] = {}
- derive_address(address)
Add the credentials from the management connection to the provided address
- Return type:
- await make_connection(url, connection_name)
- Return type:
AbstractRobustConnection
- on_signal(signal)
Callback invoked when a signal is received.
Override this method for custom signal handling. By default, it schedules the Client to stop by calling
stop()
.
- await rpc(exchange, routing_key, function, response_callback=None, timeout=60, cleanup_on_response=True, **kwargs)
Invoke an RPC over the network.
Warning
This function is not part of the public API, but is included for reference. Use
Client.rpc()
instead.- Parameters:
- function :
str
Name of the RPC to invoke
- exchange :
AbstractExchange
RabbitMQ exchange on which the request is published
- routing_key :
str
Routing key must be at most 255 bytes (UTF-8)
- response_callback : (…) →
None
|None
Optional
[Callable
[...
,None
]] (default:None
) If given, this callable will be invoked with any response once it arrives. In this case, this function immediately returns
None
.If omitted (or
None
), this function will wait for and return the first response instead.- timeout :
float
(default:60
) After the timeout, a response will not be dispatched to the handler.
- cleanup_on_response :
bool
(default:True
) If set, only the first response will be dispatched. Must be
True
when noresponse_callback
is given.- kwargs :
Any
Any additional arguments that are forwarded as arguments to the RPC itself.
Note
Argument names are required to be in
"javaScriptSnakeCase"
.
- function :
- Return type:
- Returns:
None
ifresponse_callback
is given, otherwise adict
containing the RPC response.- Raises:
PublishFailed – Failed to publish this RPC to the network.
RPCError – The remote returned an error.
TypeError – The
function
keyword-only argument is missing.TypeError –
response_callback
is None butcleanup_on_response=True
ValueError – The routing key is longer than 255 bytes
- await rpc_consume(extra_queues=[])
Start consuming RPCs
Typically, this is called at the end of
Client.connect()
once the Agent is prepared to handle RPCs.
- run(catch_signals=('SIGINT', 'SIGTERM'), cancel_on_exception=False, use_uvloop=None)
Run an Agent by calling
connect()
and waiting for it to be stopped viastop()
.If
connect()
raises an exception,ConnectFailed
is raised, with the offending exception attached as a cause. Any exception passed tostop()
is reraised.- Parameters:
- catch_signals :
Iterable
[str
] (default:('SIGINT', 'SIGTERM')
) Call
on_signal()
if any of these signals were raised.- cancel_on_exception :
bool
(default:False
) Stop the running Agent when an unhandled exception occurs. The exception is reraised from this method.
- use_uvloop :
bool
|None
Optional
[bool
] (default:None
) Use uvloop as the asyncio event loop. If
None
, uvloop is used if available. IfTrue
, uvloop is used and an ImportError is raised if it is not available. IfFalse
, uvloop is not used even if it is available.
- catch_signals :
- Raises:
ConnectFailed – Failed to
connect()
to the MetricQ network. The source exception is attached as a cause.
- Return type:
- await stop(exception=None, silent=False)
Stop a running Agent. When calling stop multiple times, all but the first call will be ignored. It will inform anyone waiting for
stopped()
about the completion or the given exception.- Parameters:
- exception :
BaseException
|None
Optional
[BaseException
] (default:None
) An optional exception that will be raised by
run()
if given. If the Agent was not started fromrun()
, seestopped()
how to retrieve this exception.- silent :
bool
(default:False
) If set to
True
, a passed exception will not be raised.
- exception :
- Raises:
AgentStopped – If an
exception
is given or an exception occurred while closing the connection(s) andsilent==False
.- Return type:
- await stopped()
Wait for this Agent to stop.
If the agent stopped unexpectedly, this method raises an exception.
- Raises:
AgentStopped – The Agent was stopped via
stop()
and an exception was passed.Exception – The Agent encountered any other unhandled exception.
- Return type:
- await teardown()
Important
Do not call this function, it is called indirectly by
Agent.stop()
.Close all connections and channels. Child classes should implement this method to close their own connections and channels and call
super().teardown()
. :rtype:None