Building a MetricQ Sink

Sinks allow you to receive and process data points for Metrics provided by others on the MetricQ network. Examples of Sinks are:

  • data visualization software

  • database clients (such as metricq-db-hta): store metric data to persistent storage

  • monitoring clients

Here, we will build a simple Sink that prints data points for a given list of metrics as they arrive. You can find the complete example at the end of this page or here.

Defining a new Sink: using metricq.Sink

New Sinks are defined by subclassing metricq.Sink:

import metricq

class DummySink(metricq.Sink):
    pass

We add a constructor to provide the Sink with a list of names of metrics which we are interested in printing. All other arguments will be forwarded to the superclass:

import metricq

class DummySink(metricq.Sink):
    def __init__(self, metrics: list[str], *args, **kwargs):
        self._metrics = metrics
        super().__init__(*args, **kwargs)

Now we will implement the interface needed to connect to the MetricQ network and receive metrics. First, we tell our Sink how to connect (metricq.Sink.connect()) to the MetricQ network. After a connection has been established, we subscribe (metricq.Sink.subscribe()) to the metrics we are interested in.

import metricq

class DummySink(metricq.Sink):

    ... # as above

    async def connect(self):
        await super().connect()
        await self.subscribe(self._metrics)

Our Sink now needs to know what to do with the data points it receives. We provide Sink.on_data(), which gets called every time a new data point arrives, and tell it to print that to the standard output:

import metricq

class DummySink(metricq.Sink):

    ... # as above

    async def on_data(
        self,
        metric: str,
        timestamp: metricq.Timestamp,
        value: float
    ):
        print("{}: {} {}".format(metric, timestamp, value))

Here, metric is the name of the metric for which a new data point arrived; value holds the numeric value this metric had at time indicated by timestamp.

Running a Sink

metricq.Sink is designed as an asynchronous callback-based interface, so we won’t be calling the above methods directly. Instead, it provides Client.run(), which handles establishing a connection, keeps track track of all the details of the MetricQ protocol and calls Sink.on_data() once new data points arrive.

Our Sink is identified on the network by a Token. In general you should make sure that no two different instances of the same Client share the same token. Though you won’t need to worry about this it if you are using metricq.Sink, as there is code in place that generates a unique token automatically (see the add_uuid argument to metricq.Sink).

If you are interested in the values of metric test.py.dummy, construct and run DummySink as follows (assuming a MetricQ network is running on localhost):

import metricq

class DummySink(metricq.Sink):
    ... # as above

if __name__ == "__main__":
    sink = DummySink(
        metrics=["test.py.dummy"],
        token="sink-py-dummy",
        url="amqp://localhost/"
    )
    sink.run()

This is it, assuming there is a Source on the network that provides data points for test.py.dummy. Running this script, you should now see something like this appearing on standard output:

...
test.py.dummy: [1588509320269324000] 2020-05-03 14:35:20.269324+02:00, 0.48311378740654076
test.py.dummy: [1588509321269232000] 2020-05-03 14:35:21.269232+02:00, 0.1490083450372932
test.py.dummy: [1588509322269017000] 2020-05-03 14:35:22.269017+02:00, 0.06578061778873023
test.py.dummy: [1588509323267878000] 2020-05-03 14:35:23.267878+02:00, 0.7771949055949513
test.py.dummy: [1588509324267969000] 2020-05-03 14:35:24.267969+02:00, 0.9975132302199418
...

See Building a MetricQ Source on how to set up such a source.

Complete example

To obtain the dependencies required for this example, install the examples-extra from the git repo:

$ pip install '.[examples]'

and run it like so:

$ ./metricq_sink.py -m test.py.dummy

#!/usr/bin/env python3
# Copyright (c) 2019, ZIH, Technische Universitaet Dresden, Federal Republic of Germany
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
#     * Redistributions of source code must retain the above copyright notice,
#       this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright notice,
#       this list of conditions and the following disclaimer in the documentation
#       and/or other materials provided with the distribution.
#     * Neither the name of metricq nor the names of its contributors
#       may be used to endorse or promote products derived from this software
#       without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import logging
from typing import Any

import click
import click_log  # type: ignore

import metricq
from metricq import Metric
from metricq.logging import get_logger

logger = get_logger()

click_log.basic_config(logger)
logger.setLevel("INFO")
logger.handlers[0].formatter = logging.Formatter(
    fmt="%(asctime)s [%(levelname)-8s] [%(name)-20s] %(message)s"
)


# To implement a MetricQ Sink, subclass metricq.Sink
class DummySink(metricq.Sink):
    """A simple :term:`Sink` which, given a list of Metrics, will print their values as they arrive from the MetricQ network."""

    def __init__(self, metrics: list[Metric], *args: Any, **kwargs: Any):
        logger.info("initializing DummySink")
        # `metrics` contains the names of Metrics for which this Sink should print values
        self._metrics = metrics
        super().__init__(*args, **kwargs)

    # Override connect() to subscribe to the Metrics of interest after a connection has been established.
    async def connect(self) -> None:
        # First, let the base class connect to the MetricQ network.
        await super().connect()

        # After the connection is established, subscribe to the list of
        # requested metrics.  For each metric, we will receive every data point
        # which sent is to MetricQ from this point on.
        await self.subscribe(self._metrics)

    # The data handler, this method is called for every data point we receive
    async def on_data(
        self, metric: str, timestamp: metricq.Timestamp, value: float
    ) -> None:
        # For this example, we just print the datapoints to standard output
        click.echo(
            click.style("{}: {}, {}".format(metric, timestamp, value), fg="bright_blue")
        )


@click.command()
@click.option("--server", default="amqp://localhost/")
@click.option("--token", default="sink-py-dummy")
@click.option("-m", "--metrics", multiple=True, required=True)
@click_log.simple_verbosity_option(logger)  # type: ignore
def source(server: str, token: str, metrics: list[Metric]) -> None:
    # Initialize the DummySink class with a list of metrics given on the
    # command line.
    sink = DummySink(metrics=metrics, token=token, url=server)

    # Run the sink.  This call will block until the connection is closed.
    sink.run()


if __name__ == "__main__":
    source()

Durable / persistent sinks

Most sinks are transient and not unique and do not have a configuration. To create persistent sink with a configuration, subclass metricq.DurableSink instead of metricq.Sink. Further, you need to implement an RPC handler for config.

@metricq.rpc_handler("config")
async def _on_config(self, **config: Any):
    ...