Subscriber

class metricq.Subscriber(*args, expires, metrics, **kwargs)
__init__(*args, expires, metrics, **kwargs)

Subscribes to a list of metrics

Parameters:
metrics : Iterable[str]

List of metrics that you want to subscribe to.

expires : Timedelta | int | float

The lifetime of the subscription queue in seconds.

queue: Optional[str]

The name of the queue that is used to buffer the subscribed data

This is only set after connect() has finished.

await connect(**kwargs)

Connects to the MetricQ network, sends the subscribe request, and disconnects again.

After it has successfully finished, the queue name is set. :rtype: None

Note

This performs the RPC and closes the connection before the return.

drain(**kwargs)

Returns a fully configured instance of a Drain, by using the given settings used for the subscription.

As the Drain is a context manager, you should use the result of this in a with-statement:

async with subscriber.drain() as data:
    async for metric, time, value in data:
        # ... process metric data

Must only be called after connect() has finished successfully.

Note

For a more convenient way to retrieve the data, see collect_data().

Returns:

Fully configured instance of a Drain

Return type:

Drain

async for ... in collect_data(**kwargs)

Asynchronously iterate over the retrieved data. Can only be called once after connect() has finished successfully.

Return type:

AsyncIterator[tuple[str, Timestamp, float]]