Building a MetricQ Drain
What is a Drain?
A Drain
is a special case of a MetricQ Sink
. Both are used to
collect the live metric data stream from MetricQ. In contrast to a Sink
,
which is continuously connected to MetricQ, a Drain
is only shortly
connected to MetricQ. In particular, after the initial connection, all wanted
metrics are subscribed and the connection is closed. While the connection is
closed, MetricQ buffers the subscribed metric data and the client can perform other
task without any perturbation caused by the connection. Once the metric is required,
a new connection gets established and the buffered data can be received and
processed like in a normal sink.
This behavior is particular useful for measurements, where it is important to reduce perturbation as much as possible.
Subscribe to metrics
The easiest way to implement this approach is to use the classes Subscriber
and Drain
provided by MetricQ.
In particular, the first connection is handled by the Subscriber
.
Provide the MetricQ URL to connect to, a Token to identify the client and a list of metrics, we want to subscribe to:
token: str = "drain-example"
server: str = "amqps://user:pass@metricq.example.org/"
metrics: List[str] = [
# ...
]
We can use the subscriber to perform the initial connect and post the subscription.
For that, we use the Subscriber
as a context manager:
async with Subscriber(token, server, metrics=metrics, expires=....) as subscriber:
# ... run task
The most important parameter here is the expires. As the connection is closed to
MetricQ, the created subscription may never be stopped, if the program gets
terminated, the expires parameter is required. It represents the time until the
MetricQ server will automatically delete all buffered data and is given in seconds
or a Timedelta
.
Note
Per design, the Subscriber
closes the connection right after the
subscribe request. In particular, in the above context, the subscriber
object does not have an open connection even within the with-statement.
Within this with-context, you can now perform any task, for instance start
the measured program with asyncio.create_subprocess_exec()
.
Stop the measurement and receive the buffered metric data
Once the collection of data shall stop and we want to receive the buffered data using Subscriber.collect_data()
:
async for metric, timestamp, value in subscriber.collect_data():
# ... consume the data point
Internally, this creates a Drain
instance that is used as context manager as well as it is an iterable over the data.
The connection- and RPC-latency for stopping the data collection could introduce inaccuracies so you may want to filter the data by timestamp.