Source code for journal_to_fedora_messaging.journal

# SPDX-FileCopyrightText: Contributors to the Fedora Project
#
# SPDX-License-Identifier: GPL-3.0-or-later

import json
import logging
import os
from shutil import which

from twisted.internet import interfaces, protocol, reactor
from zope.interface import implementer


LOGGER = logging.getLogger(__name__)


[docs] @implementer(interfaces.IPushProducer) class JournalReader: def __init__(self, config): self.config = config self._command = self.config.get("journalctl_command", ["journalctl"])[:] self._command.extend(["--follow", "--output", "json", "--since", "now"]) self.consumer = None self._protocol = None
[docs] def resumeProducing(self): if self.consumer is None: raise RuntimeError("No consumer to produce to") self._protocol = JournalProtocol(self.consumer) reactor.spawnProcess( self._protocol, which(self._command[0]), args=self._command, env=os.environ )
[docs] def stopProducing(self): self._protocol.transport.signalProcess("TERM") self._protocol.transport.loseConnection()
[docs] class JournalProtocol(protocol.ProcessProtocol): def __init__(self, consumer: interfaces.IConsumer): self._consumer = consumer self._delimiter = b"\n" self._buffer = b""
[docs] def connectionMade(self): self.transport.closeStdin()
[docs] def outReceived(self, data): lines = (self._buffer + data).split(self._delimiter) self._buffer = lines.pop(-1) for line in lines: try: self._consumer.write(json.loads(line)) except json.decoder.JSONDecodeError as e: LOGGER.warning(f"journalctl did not produce JSON! {e}")
[docs] def errReceived(self, data): LOGGER.warning(f"journalctl wrote to stderr: {data.decode()}")
[docs] def outConnectionLost(self): LOGGER.debug("journalctl stopped producing output")
[docs] def processExited(self, reason): LOGGER.info("journalctl exited with status %s", reason.value.exitCode)
[docs] def processEnded(self, reason): LOGGER.info("journalctl ended with status %s", reason.value.exitCode)