Source code for journal_to_fedora_messaging.sender

# SPDX-FileCopyrightText: Contributors to the Fedora Project
#
# SPDX-License-Identifier: GPL-3.0-or-later

import logging

from fedora_messaging.api import twisted_publish
from fedora_messaging.exceptions import ConnectionException, PublishReturned
from fedora_messaging.message import get_class, Message
from twisted.internet import defer, interfaces, reactor
from twisted.python.failure import Failure
from zope.interface import implementer


LOGGER = logging.getLogger(__name__)

PRUNE_FROM_LOG = (
    "_MACHINE_ID",
    # Those are metadata fields: https://systemd.io/JOURNAL_EXPORT_FORMATS/
    "__CURSOR",
    "__SEQNUM",
    "__SEQNUM_ID",
    "__MONOTONIC_TIMESTAMP",
)


def _matches(log_def, content):
    for key, value in log_def.get("filters", []).items():
        if key not in content:
            return False
        if content[key] != value:
            return False
    return True


def _get_body(content: dict):
    body = content.copy()
    for key in PRUNE_FROM_LOG:
        if key in body:
            del body[key]
    return body


[docs] @implementer(interfaces.IConsumer) class MessageSender: def __init__(self, config): self._config = config self._producer = None
[docs] def validate_config(self): if not self._config.get("logs", []): LOGGER.warning("No log defined in the configuration, nothing will be published") for log in self._config.get("logs", []): if not log.get("schema"): raise ValueError(f"No schema defined in the configuration for: {log!r}") if get_class(log["schema"]) == Message: raise ValueError(f"The schema {log['schema']} is not installed") if not log.get("filters", []): LOGGER.warning( f"No filters defined in the configuration for: {log!r}. " "This will match every log entry." )
def _get_schema(self, content: dict): for log in self._config["logs"]: if _matches(log, content): return get_class(log["schema"]) return None
[docs] def registerProducer(self, producer, streaming): self._producer = producer producer.consumer = self
[docs] def unregisterProducer(self): self._producer.consumer = None self._producer = None
[docs] def write(self, data: dict): schema = self._get_schema(data) if schema is None: # LOGGER.debug("Unmatched log: %r", data) return defer.succeed(None) message = schema(body=_get_body(data)) LOGGER.debug("Publishing message %s on %s: %r", message.id, message.topic, message.body) timeout = self._config.get("publish_timeout", 30) def _log_errors(failure: Failure): if failure.check(PublishReturned): LOGGER.warning( f"Fedora Messaging broker rejected message {message.id}: {failure.value}" ) elif failure.check(ConnectionException): LOGGER.warning(f"Error sending message {message.id}: {failure.value}") elif failure.check(defer.TimeoutError): LOGGER.warning( f"Timeout sending message {message.id} on {message.topic} after {timeout}s" ) else: LOGGER.error( f"Unknown error publishing message {message.id}: " f"{failure.value} ({failure.type})" ) LOGGER.error(failure.getTraceback()) def _log_success(_result): LOGGER.info(f"Published message {message.id} on {message.topic}") deferred = twisted_publish(message) deferred.addTimeout(timeout, reactor) deferred.addCallbacks(_log_success, _log_errors) return deferred