Building a MetricQ Drain

What is a Drain?

A Drain is a special case of a MetricQ Sink. Both are used to collect the live metric data stream from MetricQ. In contrast to a Sink, which is continuously connected to MetricQ, a Drain is only shortly connected to MetricQ. In particular, after the initial connection, all wanted metrics are subscribed and the connection is closed. While the connection is closed, MetricQ buffers the subscribed metric data and the client can perform other task without any perturbation caused by the connection. Once the metric is required, a new connection gets established and the buffered data can be received and processed like in a normal sink.

This behavior is particular useful for measurements, where it is important to reduce perturbation as much as possible.

Subscribe to metrics

The easiest way to implement this approach is to use the classes Subscriber and Drain provided by MetricQ.

In particular, the first connection is handled by the Subscriber.

Provide the MetricQ URL to connect to, a Token to identify the client and a list of metrics, we want to subscribe to:

token: str = "drain-example"
server: str = "amqps://user:pass@metricq.example.org/"
metrics: List[str] = [
    # ...
]

We can use the subscriber to perform the initial connect and post the subscription. For that, we use the Subscriber as a context manager:

async with Subscriber(token, server, metrics=metrics, expires=....) as subscriber:
    # ... run task

The most important parameter here is the expires. As the connection is closed to MetricQ, the created subscription may never be stopped, if the program gets terminated, the expires parameter is required. It represents the time until the MetricQ server will automatically delete all buffered data and is given in seconds or a Timedelta.

Note

Per design, the Subscriber closes the connection right after the subscribe request. In particular, in the above context, the subscriber object does not have an open connection even within the with-statement.

Within this with-context, you can now perform any task, for instance start the measured program with asyncio.create_subprocess_exec().

Stop the measurement and receive the buffered metric data

Once the collection of data shall stop and we want to receive the buffered data using Subscriber.collect_data():

async for metric, timestamp, value in subscriber.collect_data():
    # ... consume the data point

Internally, this creates a Drain instance that is used as context manager as well as it is an iterable over the data.

The connection- and RPC-latency for stopping the data collection could introduce inaccuracies so you may want to filter the data by timestamp.