Building a MetricQ Source
Sources provide data points for a set of Metrics to the MetricQ network. Usually, they are programs that continually measure some kind of quantity, for example:
system monitoring (such as metricq-source-sysinfo): CPU usage, memory utilization…
low-level server sensor readings (e.g. via IPMI and metricq-source-ipmi)
pulling information from building automation (metricq-source-bacnet)
Here, we will build a simple Source that sends a random value between 0 and 1 at a configurable interval.
Defining a new Source: using metricq.Source
Similarly to Sinks, we create a new Source by subclassing metricq.Source
:
import metricq
class DummySource(metricq.Source):
pass
The Source will receive its configuration dynamically over the network. It is saved in a JSON format and can be entered and saved using a configuration frontend such as the MetricQ Wizard. For our purposes it’s quite minimal, it only includes the rate at which to send values (in Hz):
{
"rate": 2
}
This is different from the Sink we built in Building a MetricQ Sink; we won’t pass the update rate on the command line, instead we install a callback that is triggered every time a new configuration is received:
import metricq
class DummySource(metricq.Source):
def __init__(self, *args, **kwargs):
# This will be set to the number of values we want to send per second:
self._rate = None
super().__init__(*args, **kwargs)
@metricq.rpc_handler("config")
async def _on_config(self, **config):
print(f"Received new configuration: {config}")
self._rate = config["rate"]
Clients on the MetricQ network communicate via an RPC protocol.
The metricq.rpc_handler()
decorator is a way to define a new handler for an RPC;
here we tell the library to call _on_config
every time another client sends a "config"
-RPC
containing our new configuration.
So far, our Source would work, but it wouldn’t do anything useful at all. To change that, we first declare for which metric we want to send values, including some helpful metadata:
import metricq
class DummySource(metricq.Source):
... # as above
@metricq.rpc_handler("config")
async def _on_config(self, **config: Any):
print(f"Received new configuration: {config}")
self._scale = config["scale"]
metadata = {
"rate": 1,
"scale": scale,
"description": "A simple dummy metric providing random values, sent from a python DummySource",
}
await self.declare_metrics({"example.py.dummy": metadata})
To finally send some values, we override Source.task()
.
This method gets called once our Source is connected and received its initial configuration.
It should be build to stop upon Source.task_stop_future
.
import metricq
import asyncio
import random
class DummySource(metricq.Source):
... # as above
async def wait_for_sensor_value(self) -> float:
await sleep(1) # Just simulate waiting for data
return random.random() * self._scale
async def task(self) -> None:
while not self.task_stop_future.done():
value = self.wait_for_sensor_value()
await self.send(
"example.py.dummy",
time=metricq.Timestamp.now(),
value=value,
)
Note
The coroutine overriding Source.task()
is not restarted if it returned an exception.
Make sure to handle errors appropriately, such as Source.send()
raising PublishFailed
.
Note
There are more elaborate low-latency ways to react to the Source.task_stop_future
being set.
Please refer to the implementation of task in IntervalSource
for a more rigid example.
Improving constant-rate sources: using IntervalSource
The above situation where we send values at a fixed rate is so common
that we can use the convenience class IntervalSource
, which
does all the heavy lifting for us.
Note
We strongly recommend implementing a IntervalSource
over a plain Source
if possible.
It tries to automatically compensate some timing-related issues
that inevitably arise in more complicated setups.
See the documentation for IntervalSource
for more information.
To adapt the above example, we simply set IntervalSource.period
to the period of time between consecutive updates and replace Source.task
with IntervalSource.update
,
which gets called at a constant rate:
import metricq
import asyncio
import random
class DummySource(metricq.Source):
@metricq.rpc_handler("config")
async def _on_config(self, **config):
# Set the update period
rate = config["rate"]
self.period = 1 / rate
...
await self.declare_metrics({"test.py.interval-dummy": metadata})
async def update(self):
await self.send(
"example.py.interval-dummy",
time=metricq.Timestamp.now(),
value=random.random(),
)
Running a Source
Similarly to Sinks, a Source is started by calling run
.
On construction, we need to supply a unique Token for identification and a URL of the network.
class DummySource(metricq.Source):
... # as above
if __name__ == "__main__":
source = DummySource(
token="sink-py-example",
url="amqp://localhost/",
)
source.run()
Complete Example
To obtain the dependencies required for this example, install the examples
-extra from the git repo:
$ pip install '.[examples]'
and run it like so:
$ ./examples/metricq_source.py
#!/usr/bin/env python3
# Copyright (c) 2018, ZIH, Technische Universitaet Dresden, Federal Republic of Germany
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of metricq nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import random
from typing import Any
import metricq
from metricq.cli import metricq_command
from metricq.logging import get_logger
logger = get_logger()
class DummySource(metricq.IntervalSource):
def __init__(self, *args: Any, **kwargs: Any):
logger.info("initializing DummySource")
super().__init__(*args, **kwargs)
@metricq.rpc_handler("config")
async def _on_config(self, **config: Any) -> None:
logger.info("DummySource config: {}", config)
# Set the update period
rate = config["rate"]
self.period = 1 / rate
# Supply some metadata for the metric declared below
metadata = {
"rate": rate,
"description": "A simple dummy metric providing random values, sent from a python DummySource",
"unit": "", # unit-less metrics indicate this with an empty string
}
await self.declare_metrics({"test.py.dummy": metadata})
async def update(self) -> None:
# Send a random value at the current time:
await self.send(
"test.py.dummy", time=metricq.Timestamp.now(), value=random.random()
)
@metricq_command(default_token="source-py-dummy")
def source(server: str, token: str) -> None:
src = DummySource(token=token, url=server)
src.run()
if __name__ == "__main__":
source()