Building a MetricQ Source

Sources provide data points for a set of Metrics to the MetricQ network. Usually, they are programs that continually measure some kind of quantity, for example:

Here, we will build a simple Source that sends a random value between 0 and 1 at a configurable interval.

Defining a new Source: using metricq.Source

Similarly to Sinks, we create a new Source by subclassing metricq.Source:

import metricq

class DummySource(metricq.Source):
    pass

The Source will receive its configuration dynamically over the network. It is saved in a JSON format and can be entered and saved using a configuration frontend such as the MetricQ Wizard. For our purposes it’s quite minimal, it only includes the rate at which to send values (in Hz):

{
    "rate": 2
}

This is different from the Sink we built in Building a MetricQ Sink; we won’t pass the update rate on the command line, instead we install a callback that is triggered every time a new configuration is received:

import metricq

class DummySource(metricq.Source):
    def __init__(self, *args, **kwargs):
        # This will be set to the number of values we want to send per second:
        self._rate = None
        super().__init__(*args, **kwargs)

    @metricq.rpc_handler("config")
    async def _on_config(self, **config):
        print(f"Received new configuration: {config}")
        self._rate = config["rate"]

Clients on the MetricQ network communicate via an RPC protocol. The metricq.rpc_handler() decorator is a way to define a new handler for an RPC; here we tell the library to call _on_config every time another client sends a "config"-RPC containing our new configuration.

So far, our Source would work, but it wouldn’t do anything useful at all. To change that, we first declare for which metric we want to send values, including some helpful metadata:

import metricq

class DummySource(metricq.Source):
    ... # as above

    @metricq.rpc_handler("config")
    async def _on_config(self, **config: Any):
        print(f"Received new configuration: {config}")
        self._scale = config["scale"]

        metadata = {
            "rate": 1,
            "scale": scale,
            "description": "A simple dummy metric providing random values, sent from a python DummySource",
        }

        await self.declare_metrics({"example.py.dummy": metadata})

To finally send some values, we override Source.task(). This method gets called once our Source is connected and received its initial configuration. It should be build to stop upon Source.task_stop_future.

import metricq
import asyncio
import random

class DummySource(metricq.Source):
    ... # as above
    async def wait_for_sensor_value(self) -> float:
        await sleep(1) # Just simulate waiting for data
        return random.random() * self._scale

    async def task(self) -> None:
        while not self.task_stop_future.done():
            value = self.wait_for_sensor_value()
            await self.send(
                "example.py.dummy",
                time=metricq.Timestamp.now(),
                value=value,
            )

Note

The coroutine overriding Source.task() is not restarted if it returned an exception. Make sure to handle errors appropriately, such as Source.send() raising PublishFailed.

Note

There are more elaborate low-latency ways to react to the Source.task_stop_future being set. Please refer to the implementation of task in IntervalSource for a more rigid example.

Improving constant-rate sources: using IntervalSource

The above situation where we send values at a fixed rate is so common that we can use the convenience class IntervalSource, which does all the heavy lifting for us.

Note

We strongly recommend implementing a IntervalSource over a plain Source if possible. It tries to automatically compensate some timing-related issues that inevitably arise in more complicated setups. See the documentation for IntervalSource for more information.

To adapt the above example, we simply set IntervalSource.period to the period of time between consecutive updates and replace Source.task with IntervalSource.update, which gets called at a constant rate:

import metricq
import asyncio
import random

class DummySource(metricq.Source):

    @metricq.rpc_handler("config")
    async def _on_config(self, **config):
        # Set the update period
        rate = config["rate"]
        self.period = 1 / rate

        ...

        await self.declare_metrics({"test.py.interval-dummy": metadata})


    async def update(self):
        await self.send(
            "example.py.interval-dummy",
            time=metricq.Timestamp.now(),
            value=random.random(),
        )

Running a Source

Similarly to Sinks, a Source is started by calling run. On construction, we need to supply a unique Token for identification and a URL of the network.

class DummySource(metricq.Source):

    ... # as above

if __name__ == "__main__":
    source = DummySource(
        token="sink-py-example",
        url="amqp://localhost/",
    )
    source.run()

Complete Example

To obtain the dependencies required for this example, install the examples-extra from the git repo:

$ pip install '.[examples]'

and run it like so:

$ ./examples/metricq_source.py

#!/usr/bin/env python3
# Copyright (c) 2018, ZIH, Technische Universitaet Dresden, Federal Republic of Germany
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
#     * Redistributions of source code must retain the above copyright notice,
#       this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright notice,
#       this list of conditions and the following disclaimer in the documentation
#       and/or other materials provided with the distribution.
#     * Neither the name of metricq nor the names of its contributors
#       may be used to endorse or promote products derived from this software
#       without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import logging
import random
from typing import Any

import click
import click_log  # type: ignore

import metricq
from metricq.logging import get_logger

logger = get_logger()

click_log.basic_config(logger)
logger.setLevel("INFO")
logger.handlers[0].formatter = logging.Formatter(
    fmt="%(asctime)s [%(levelname)-8s] [%(name)-20s] %(message)s"
)


class DummySource(metricq.IntervalSource):
    def __init__(self, *args: Any, **kwargs: Any):
        logger.info("initializing DummySource")
        super().__init__(*args, **kwargs)

    @metricq.rpc_handler("config")
    async def _on_config(self, **config: Any) -> None:
        logger.info("DummySource config: {}", config)

        # Set the update period
        rate = config["rate"]
        self.period = 1 / rate

        # Supply some metadata for the metric declared below
        metadata = {
            "rate": rate,
            "description": "A simple dummy metric providing random values, sent from a python DummySource",
            "unit": "",  # unit-less metrics indicate this with an empty string
        }
        await self.declare_metrics({"test.py.dummy": metadata})

    async def update(self) -> None:
        # Send a random value at the current time:
        await self.send(
            "test.py.dummy", time=metricq.Timestamp.now(), value=random.random()
        )


@click.command()
@click.option("--server", default="amqp://localhost/")
@click.option("--token", default="source-py-dummy")
@click_log.simple_verbosity_option(logger)  # type: ignore
def source(server: str, token: str) -> None:
    src = DummySource(token=token, url=server)
    src.run()


if __name__ == "__main__":
    source()