package com.github.pawelj_pl.event_bus_service.services;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pawelj_pl.event_bus_service.constants;
import com.github.pawelj_pl.event_bus_service.exceptions.EventBusConnectionException;
import com.github.pawelj_pl.event_bus_service.exceptions.EventBusEventConversionException;
import com.github.pawelj_pl.event_bus_service.exceptions.EventBusListenException;
import com.github.pawelj_pl.event_bus_service.exceptions.EventBusPublishException;
import com.github.pawelj_pl.event_bus_service.exceptions.EventBusResourceException;
import com.github.pawelj_pl.event_bus_service.handlers.RabbitMqMainConsumer;
import com.github.pawelj_pl.event_bus_service.helpers.SystemConfig;
import com.github.pawelj_pl.event_bus_service.model.EventData;
import com.github.pawelj_pl.event_bus_service.model.RegisteredHandler;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashSet;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.slf4j.MDC;

/* loaded from: input_file:com/github/pawelj_pl/event_bus_service/services/EventBusRabbitMqImpl.class */
public class EventBusRabbitMqImpl extends EventBusBase {
    private final Connection connection;
    private final String applicationName;
    private static final String DEFAULT_EXCHANGE_NAME = "EVENT_BUS_EXCHANGE";
    private static final String QUEUE_NAME_TEMPLATE = "EVENT_BUS_%s_SUBSCRIPTIONS";
    private final SystemConfig systemConfig;
    private static final String instanceId = UUID.randomUUID().toString();
    private String exchangeName;
    private String commonQueueName;
    private String privateQueueName;
    private boolean consumingStarted;
    private Set<Channel> publishChannelsPool;

    public EventBusRabbitMqImpl(String str) {
        this(str, new SystemConfig());
    }

    public EventBusRabbitMqImpl(String str, Connection connection) {
        this(str, connection, new SystemConfig());
    }

    public EventBusRabbitMqImpl(String str, SystemConfig systemConfig) {
        this(str, createDefaultConnection(systemConfig), systemConfig);
    }

    public EventBusRabbitMqImpl(String str, Connection connection, SystemConfig systemConfig) {
        this.privateQueueName = "NON_DECLARED_PRIVATE_QUEUE";
        this.publishChannelsPool = new HashSet();
        this.connection = connection;
        this.applicationName = str;
        this.systemConfig = systemConfig;
    }

    private static Connection createDefaultConnection(SystemConfig systemConfig) {
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            Optional<String> optional = systemConfig.get("eventbus.host");
            connectionFactory.getClass();
            optional.ifPresent(connectionFactory::setHost);
            Optional<Integer> optional2 = systemConfig.getInt("eventbus.port");
            connectionFactory.getClass();
            optional2.ifPresent((v1) -> {
                r1.setPort(v1);
            });
            Optional<String> optional3 = systemConfig.get("eventbus.username");
            connectionFactory.getClass();
            optional3.ifPresent(connectionFactory::setUsername);
            Optional<String> optional4 = systemConfig.get("eventbus.password");
            connectionFactory.getClass();
            optional4.ifPresent(connectionFactory::setPassword);
            Optional<String> optional5 = systemConfig.get("eventbus.virtualhost");
            connectionFactory.getClass();
            optional5.ifPresent(connectionFactory::setVirtualHost);
            connectionFactory.setExceptionHandler(new ConnectionExceptionHandler());
            return connectionFactory.newConnection();
        } catch (IOException | TimeoutException e) {
            throw new EventBusConnectionException("Unable to connect to Event Bus service", e);
        }
    }

    @Override // com.github.pawelj_pl.event_bus_service.services.EventBus
    public <T> void publishMessage(String str, T t) {
        publishMessage(str, t, getOrGenerateCorrelationId());
    }

    @Override // com.github.pawelj_pl.event_bus_service.services.EventBus
    public <T> void publishMessage(String str, T t, String str2) {
        Channel channelFromPool = getChannelFromPool();
        if (this.exchangeName == null) {
            declareExchange(channelFromPool);
        }
        EventData eventData = new EventData(this.applicationName + "@" + getHostName(), Long.valueOf(System.currentTimeMillis()), str, str2, t);
        this.logger.debug("Publishing message in exchange {} with routing key {}", this.exchangeName, str);
        publish(channelFromPool, str, convertEventDataToString(eventData).getBytes());
        this.publishChannelsPool.add(channelFromPool);
    }

    private void publish(Channel channel, String str, byte[] bArr) {
        try {
            channel.basicPublish(this.exchangeName, str, (AMQP.BasicProperties) null, bArr);
        } catch (IOException e) {
            throw new EventBusPublishException("Error during publishing message: ", e);
        }
    }

    @Override // com.github.pawelj_pl.event_bus_service.services.EventBus
    public synchronized void listen() {
        if (this.consumingStarted) {
            throw new EventBusListenException("Consuming already started");
        }
        if (this.handlers.isEmpty()) {
            this.logger.info("No handler found");
            return;
        }
        Channel newChannel = getNewChannel();
        declareExchange(newChannel);
        boolean anyMatch = this.handlers.stream().anyMatch((v0) -> {
            return v0.isPrivate();
        });
        declareQueues(newChannel, anyMatch);
        this.handlers.forEach(registeredHandler -> {
            declareBinding(newChannel, registeredHandler);
        });
        setListenerPrefetch(newChannel);
        RabbitMqMainConsumer rabbitMqMainConsumer = new RabbitMqMainConsumer(newChannel, this.handlers);
        startConsuming(newChannel, rabbitMqMainConsumer, this.commonQueueName);
        if (anyMatch) {
            startConsuming(newChannel, rabbitMqMainConsumer, this.privateQueueName);
        }
        this.consumingStarted = true;
    }

    private Channel getNewChannel() {
        try {
            return this.connection.createChannel();
        } catch (IOException e) {
            throw new EventBusResourceException("Error during creating channel", e);
        }
    }

    private void declareExchange(Channel channel) {
        this.exchangeName = this.systemConfig.get("eventbus.exchange.name").orElse(DEFAULT_EXCHANGE_NAME);
        try {
            channel.exchangeDeclare(this.exchangeName, BuiltinExchangeType.TOPIC, false, false, Collections.emptyMap());
            this.logger.debug("Declared exchange {}", this.exchangeName);
        } catch (IOException e) {
            throw new EventBusResourceException(String.format("Error during creating exchange %s", this.exchangeName), e);
        }
    }

    private void declareQueues(Channel channel, boolean z) {
        this.commonQueueName = String.format(QUEUE_NAME_TEMPLATE, this.applicationName);
        declareSingleQueue(channel, this.commonQueueName, false);
        if (z) {
            this.privateQueueName = String.format(QUEUE_NAME_TEMPLATE, this.applicationName + "_" + instanceId);
            declareSingleQueue(channel, this.privateQueueName, true);
        }
    }

    private void declareSingleQueue(Channel channel, String str, boolean z) {
        try {
            channel.queueDeclare(str, false, z, false, Collections.emptyMap());
            this.logger.debug("Declared queue {}", str);
        } catch (IOException e) {
            throw new EventBusResourceException(String.format("Error during creating queue %s", str), e);
        }
    }

    private void declareBinding(Channel channel, RegisteredHandler registeredHandler) {
        String str = registeredHandler.isPrivate().booleanValue() ? this.privateQueueName : this.commonQueueName;
        try {
            channel.queueBind(str, this.exchangeName, registeredHandler.getRoutingKey());
            this.logger.debug("Created binding queue {} to exchange {} with routing key {}", new Object[]{str, this.exchangeName, registeredHandler.getRoutingKey()});
        } catch (IOException e) {
            throw new EventBusResourceException(String.format("Unable to bind queue %s to exchange %s with routing key %s", str, this.exchangeName, registeredHandler.getRoutingKey()), e);
        }
    }

    private void setListenerPrefetch(Channel channel) {
        int intValue = this.systemConfig.getInt("eventbus.prefetch").orElse(10).intValue();
        try {
            channel.basicQos(intValue);
            this.logger.debug("Set prefetch to {}", Integer.valueOf(intValue));
        } catch (IOException e) {
            throw new EventBusListenException("Unable to set prefetch for listener", e);
        }
    }

    private void startConsuming(Channel channel, Consumer consumer, String str) {
        try {
            channel.basicConsume(str, true, consumer);
            this.logger.debug("Started consuming queue {}", str);
        } catch (IOException e) {
            throw new EventBusListenException(String.format("Unable to start consuming queue %s", str), e);
        }
    }

    private String getOrGenerateCorrelationId() {
        String str = MDC.get(constants.CID_MDC_ATTRIBUTE);
        return str != null ? str : UUID.randomUUID().toString();
    }

    private String getHostName() {
        try {
            return InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            this.logger.warn("Unable to determine hostname. Using 'unknown' instead");
            return "unknown";
        }
    }

    private synchronized Channel getChannelFromPool() {
        try {
            Channel next = this.publishChannelsPool.iterator().next();
            this.publishChannelsPool.remove(next);
            return next;
        } catch (NoSuchElementException e) {
            return getNewChannel();
        }
    }

    private String convertEventDataToString(EventData eventData) {
        try {
            return new ObjectMapper().writeValueAsString(eventData);
        } catch (JsonProcessingException e) {
            throw new EventBusEventConversionException(String.format("Unable to convert eventData %s to string:", eventData), e);
        }
    }
}
