Source code for journal_to_fedora_messaging.cli

# SPDX-FileCopyrightText: Contributors to the Fedora Project
#
# SPDX-License-Identifier: GPL-3.0-or-later

import logging
import os

import click
from twisted.internet import asyncioreactor, error


try:
    asyncioreactor.install()
except error.ReactorAlreadyInstalledError:
    # The tests install a reactor before importing this module
    from twisted.internet import reactor

    if not isinstance(reactor, asyncioreactor.AsyncioSelectorReactor):  # pragma: no cover
        raise

from fedora_messaging.api import _init_twisted_service
from fedora_messaging.config import conf as fm_config
from fedora_messaging.exceptions import ConfigurationException
from twisted.application import service
from twisted.internet import reactor

from .journal import JournalReader
from .sender import MessageSender


LOGGER = logging.getLogger(__name__)


@click.command()
@click.option("-c", "--config", envvar="FEDORA_MESSAGING_CONF", help="Configuration file")
def main(config):
    if config:
        if not os.path.isfile(config):
            raise click.exceptions.BadParameter(f"{config} is not a file")
        try:
            fm_config.load_config(config_path=config)
        except ConfigurationException as e:
            raise click.exceptions.BadParameter(str(e)) from e
    fm_config.setup_logging()

    conf = fm_config["consumer_config"]
    bridge_service = JournalToFedoraMessagingService(conf)
    reactor.callWhenRunning(bridge_service.startService)
    _init_twisted_service()
    reactor.run()


[docs] class JournalToFedoraMessagingService(service.Service): def __init__(self, config): self._consumer = MessageSender(config) self._producer = JournalReader(config)
[docs] def startService(self): self._consumer.validate_config() self._consumer.registerProducer(self._producer, True) self._producer.resumeProducing()
[docs] def stopService(self): self._producer.stopProducing() self._consumer.unregisterProducer()