Source

class metricq.Source(*args, **kwargs)

A MetricQ Source.

See Building a MetricQ Source on how to implement a new Source.

await connect()

Connect to the MetricQ network

Return type:

None

await declare_metrics(metrics)

Declare a set of metrics this Source produces values for.

Before producing data points for some metric, a Source must have declared that Metric.

Parameters:
metrics : MappingMapping[str, dict[str, Any]]

A dictionary mapping metrics to their metadata. The metadata is given as a dictionary mapping metadata-keys (strings) to arbitrary values.

Return type:

None

Example

from metricq import Source, rpc_handler

class MySource(Source):

    ...

    @rpc_handler("config")
    async def on_config(self, **config):
        ...
        await self.declare_metrics({
            "example.temperature": {
                "description": "an example temperature reading"
                "unit": "C",
                "rate": config["rate"],
                "some_arbitrary_metadata": {
                    "foo": "bar",
                },
            },
        })
await flush()

Flush all unsent data points to the network immediately.

If automatic chunking is turned off (chunk_size is None), use this method to send data points.

Return type:

None

await send(metric, time, value)

Send a data point for a Metric.

Parameters:
metric : str

name of a metric

timestamp

timepoint at which this metric was measured

value : float

value of the metric at time of measurement

Return type:

None

Note

Data points are not sent immediately, instead they are collected and sent in chunks. See chunk_size how to control chunking behaviour.

Raises:

PublishFailed – if sending a data point failed

Warning

In case of failure, unsent data points remain buffered. An attempt at sending them is made once flush() is triggered, either manually or on the next call to send().

In particular, you should not call this method again with the same data point, even if the first call failed. Otherwise, duplicate data points will be sent, which results in an invalid metric.

abstractmethod await task()

Override this with your main task for generating data points.

The task is started after the source has connected and received its initial configuration. :rtype: None

Note

This task is not restarted if it fails. You are responsible for handling all relevant exceptions and for stopping the task when Source.task_stop_future is set.

await teardown()

Important

Do not call this function, it is called indirectly by Agent.stop().

Triggers a stop for the source task and waits for it to complete in addition to DataClient.teardown(). :rtype: None

chunk_size: Optional[int]

Number of data points collected (per metric) into a chunk before being sent.

This can be overriden for individual metrics:

source = Source(...)
source.chunk_size = 10
source["example.metric"].chunk_size = 42

Initially, this value is set to 1, so any data point is sent immediately. If set to None, automatic chunking is disabled and data points must be sent off to the network manually using flush().

To reduce network and packet overhead, it may be advisable to send multiple data points at once. Be aware that there is an overhead-latency trade-off to be made: If your Source produces one data point every \(10\) seconds, having a chunk_size of 10 means that it takes almost \(2\) minutes (\(100\) s) before a chunk is is sent. If instead it produces \(1000\) data points per second, network load can be reduced by setting a value of 1000 without affecting latency too much.

Raises:
  • TypeError – if value set is neither None nor an integer

  • ValueError – if value set not a positive, non-zero integer

task_stop_future: Optional[Future[None]] = None

This future indicates when the task should be stopped. The result is meaningless (None).


class metricq.IntervalSource(*args, period=None, **kwargs)

A Source producing metrics at regular intervals of time.

Use an IntervalSource if you want to produce data points at a constant rate, without having to worry about getting the timing right. Put your code producing data points into update(), which gets called at the specified intervals.

The IntervalSource handles missed deadlines for you: If the code in update() takes longer than period to execute, it will skip the next updates until it caught up, otherwise keeping a constant update rate.

Keyword Arguments:
period

time between consecutive updates, in number of seconds or as Timedelta

Example

Sending an incrementing counter once a second:

from metricq import IntervalSource, rpc_handler

class Counter(IntervalSource):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, period=Timedelta.from_s(1), **kwargs)
        self.counter = 0

    @rpc_handler("config")
    async def _on_config(self, **_config):
        await self.declare_metrics(["example.counter"])

    async def update(self):
        await self.send("example.counter", time=Timestamp.now(), value=self.counter)
        self.counter += 1
property period: Timedelta | None

Time interval at which update() is called.

Initially, this is set to None. Must be set to a Timedelta before update() is run for the first time.

To change the update period at a later time, overwrite this value. The change will be picked up before update() is run the next time.

Note

Setting this value to None in application code is not supported and will raise a TypeError. In particular:

  • Once set, the period cannot be reset to None to stop the update() task.

  • There’s no need to initialize this value to None, IntervalSource takes care of that.

Exceptions:

TypeError: if a period-reset is attempted by assigning None

Changed in version 2.0.0: Return period as Timedelta instead of a number of seconds.

abstractmethod await update()

A user-provided method called at intervals given by period.

Override this method to produce data points at a constant rate. :rtype: None

Note

The task periodically calling this method is not restarted if there is an exception thrown. You are responsible for handling all relevant exceptions in this method.


class metricq.SynchronousSource(*args, **kwargs)

This is a Source that can be used in a synchronous context. It spawns a new thread and runs an asynchronous Source in it. Therefore, this class does not actually derive from Source.

All parameters are passed to Source.__init__.

declare_metrics(metrics, block=True, timeout=60)

Declare the metrics that are published by this source. Returns immediately unless block is True. Exceptions other than a timeout are not propagated to the caller, but logged instead.

Parameters:
metrics : MappingMapping[str, dict[str, Any]]

a mapping from metric name to metadata

block : bool (default: True)

wait for completion of the asynchronous send operation

timeout : float (default: 60)

in seconds, is only used in blocking mode

Raises:

TimeoutError – in case of timeout

Return type:

None

send(metric, time, value, block=True, timeout=60)

Send a single metric timestamp / value to MetricQ. Returns immediately unless block is True. Exceptions other than a timeout are not propagated to the caller, but logged instead.

Parameters:
metric : str

name of the metric

time : Timestamp

timestamp

value : float

value of the metric

block : bool (default: True)

wait for completion of the asynchronous send operation

timeout : float (default: 60)

in seconds, is only used in blocking mode

Raises:

TimeoutError – in case of timeout

Return type:

None

stop(timeout=60)

Stop the source and wait for the thread to join. Exceptions other than a timeout are not propagated to the caller, but logged instead.

Parameters:
timeout : float (default: 60)

timeout in seconds

Raises:

TimeoutError – in case of timeout

Return type:

None