europapark-exporter/src/europapark_exporter/collector.py

42 lines
1.2 KiB
Python

# pylint: disable=too-few-public-methods
from prometheus_client import CollectorRegistry, generate_latest
from prometheus_client.core import GaugeMetricFamily
from prometheus_client.registry import Collector
from .europapark import EuropaparkAPI
class WaitTimesCollector(Collector):
"""
Collects Europapark waiting times
"""
def __init__(self, api):
self._api: EuropaparkAPI = api
def collect(self): # pylint: disable=missing-docstring
wait_time_metrics = GaugeMetricFamily(
'europapark_wait_time',
'Europa Park waiting time',
labels=["code", "name"])
wt = self._api.get_waiting_times()
for code, wait_time in wt.items():
attr = self._api.get_attraction_from_code(code)
if not attr:
continue
name = attr.setdefault("name", "Unknown")
label_values = [str(code), name]
wait_time_metrics.add_metric(label_values, wait_time)
yield wait_time_metrics
def collect_europapark():
"""Scrape a host and return prometheus text format for it"""
api = EuropaparkAPI()
registry = CollectorRegistry()
registry.register(WaitTimesCollector(api))
return generate_latest(registry)