Source
- class metricq.Source(*args, **kwargs)
A MetricQ Source.
See Building a MetricQ Source on how to implement a new Source.
- await declare_metrics(metrics)
Declare a set of metrics this Source produces values for.
Before producing data points for some metric, a Source must have declared that Metric.
- Parameters:
- Return type:
Example
from metricq import Source, rpc_handler class MySource(Source): ... @rpc_handler("config") async def on_config(self, **config): ... await self.declare_metrics({ "example.temperature": { "description": "an example temperature reading" "unit": "C", "rate": config["rate"], "some_arbitrary_metadata": { "foo": "bar", }, }, })
- await flush()
Flush all unsent data points to the network immediately.
If automatic chunking is turned off (
chunk_size
isNone
), use this method to send data points.- Return type:
- await send(metric, time, value)
Send a data point for a Metric.
- Parameters:
- Return type:
Note
Data points are not sent immediately, instead they are collected and sent in chunks. See
chunk_size
how to control chunking behaviour.- Raises:
PublishFailed – if sending a data point failed
Warning
In case of failure, unsent data points remain buffered. An attempt at sending them is made once
flush()
is triggered, either manually or on the next call tosend()
.In particular, you should not call this method again with the same data point, even if the first call failed. Otherwise, duplicate data points will be sent, which results in an invalid metric.
- abstractmethod await task()
Override this with your main task for generating data points.
The task is started after the source has connected and received its initial configuration. :rtype:
None
Note
This task is not restarted if it fails. You are responsible for handling all relevant exceptions and for stopping the task when
Source.task_stop_future
is set.
- await teardown()
Important
Do not call this function, it is called indirectly by
Agent.stop()
.Triggers a stop for the source task and waits for it to complete in addition to
DataClient.teardown()
. :rtype:None
-
chunk_size:
Optional
[int
] Number of data points collected (per metric) into a chunk before being sent.
This can be overriden for individual metrics:
source = Source(...) source.chunk_size = 10 source["example.metric"].chunk_size = 42
Initially, this value is set to
1
, so any data point is sent immediately. If set toNone
, automatic chunking is disabled and data points must be sent off to the network manually usingflush()
.To reduce network and packet overhead, it may be advisable to send multiple data points at once. Be aware that there is an overhead-latency trade-off to be made: If your Source produces one data point every \(10\) seconds, having a
chunk_size
of10
means that it takes almost \(2\) minutes (\(100\) s) before a chunk is is sent. If instead it produces \(1000\) data points per second, network load can be reduced by setting a value of1000
without affecting latency too much.- Raises:
TypeError – if value set is neither
None
nor an integerValueError – if value set not a positive, non-zero integer
- class metricq.IntervalSource(*args, period=None, **kwargs)
A Source producing metrics at regular intervals of time.
Use an
IntervalSource
if you want to produce data points at a constant rate, without having to worry about getting the timing right. Put your code producing data points intoupdate()
, which gets called at the specified intervals.The
IntervalSource
handles missed deadlines for you: If the code inupdate()
takes longer thanperiod
to execute, it will skip the next updates until it caught up, otherwise keeping a constant update rate.- Keyword Arguments:
- period
time between consecutive updates, in number of seconds or as
Timedelta
Example
Sending an incrementing counter once a second:
from metricq import IntervalSource, rpc_handler class Counter(IntervalSource): def __init__(self, *args, **kwargs): super().__init__(*args, period=Timedelta.from_s(1), **kwargs) self.counter = 0 @rpc_handler("config") async def _on_config(self, **_config): await self.declare_metrics(["example.counter"]) async def update(self): await self.send("example.counter", time=Timestamp.now(), value=self.counter) self.counter += 1
- property period: Timedelta | None
Time interval at which
update()
is called.Initially, this is set to
None
. Must be set to aTimedelta
beforeupdate()
is run for the first time.To change the update period at a later time, overwrite this value. The change will be picked up before
update()
is run the next time.Note
Setting this value to
None
in application code is not supported and will raise aTypeError
. In particular:Once set, the period cannot be reset to
None
to stop theupdate()
task.There’s no need to initialize this value to
None
,IntervalSource
takes care of that.
- Exceptions:
TypeError: if a period-reset is attempted by assigning
None
Changed in version 2.0.0: Return period as
Timedelta
instead of a number of seconds.
- abstractmethod await update()
A user-provided method called at intervals given by
period
.Override this method to produce data points at a constant rate. :rtype:
None
Note
The task periodically calling this method is not restarted if there is an exception thrown. You are responsible for handling all relevant exceptions in this method.
- class metricq.SynchronousSource(*args, **kwargs)
This is a Source that can be used in a synchronous context. It spawns a new thread and runs an asynchronous
Source
in it. Therefore, this class does not actually derive fromSource
.All parameters are passed to Source.__init__.
- declare_metrics(metrics, block=True, timeout=60)
Declare the metrics that are published by this source. Returns immediately unless block is True. Exceptions other than a timeout are not propagated to the caller, but logged instead.
- Parameters:
- Raises:
TimeoutError – in case of timeout
- Return type:
- send(metric, time, value, block=True, timeout=60)
Send a single metric timestamp / value to MetricQ. Returns immediately unless block is True. Exceptions other than a timeout are not propagated to the caller, but logged instead.
- Parameters:
- Raises:
TimeoutError – in case of timeout
- Return type:
- stop(timeout=60)
Stop the source and wait for the thread to join. Exceptions other than a timeout are not propagated to the caller, but logged instead.
- Parameters:
- timeout :
float
(default:60
) timeout in seconds
- timeout :
- Raises:
TimeoutError – in case of timeout
- Return type: