package com.github.dbmdz.flusswerk.framework.rabbitmq;

import com.github.dbmdz.flusswerk.framework.config.properties.RoutingProperties;
import com.github.dbmdz.flusswerk.framework.exceptions.InvalidMessageException;
import com.github.dbmdz.flusswerk.framework.model.Envelope;
import com.github.dbmdz.flusswerk.framework.model.Message;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/dbmdz/flusswerk/framework/rabbitmq/MessageBroker.class */
public class MessageBroker {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageBroker.class);
    private static final String MESSAGE_TTL = "x-message-ttl";
    private static final String DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange";
    private final RoutingProperties routingConfig;
    private final RabbitClient rabbitClient;

    public MessageBroker(RoutingProperties routingProperties, RabbitClient rabbitClient) throws IOException {
        this.routingConfig = routingProperties;
        this.rabbitClient = rabbitClient;
        provideExchanges();
        provideInputQueues();
        provideOutputQueues();
    }

    @Deprecated
    void send(Message message) throws IOException {
        String str = this.routingConfig.getOutgoing().get("default");
        if (str == null) {
            throw new RuntimeException("Cannot send message, no default queue specified");
        }
        send(str, message);
    }

    @Deprecated
    public void send(Collection<? extends Message> collection) throws IOException {
        String str = this.routingConfig.getOutgoing().get("default");
        if (str == null) {
            throw new RuntimeException("Cannot send messages, no default queue specified");
        }
        send(str, collection);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void send(String str, Message message) throws IOException {
        this.rabbitClient.send(this.routingConfig.getExchange(str), str, message);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void send(String str, Collection<? extends Message> collection) throws IOException {
        Iterator<? extends Message> it = collection.iterator();
        while (it.hasNext()) {
            send(str, it.next());
        }
    }

    public Message receive(String str) throws IOException, InvalidMessageException {
        return this.rabbitClient.receive(str);
    }

    public Message receive() throws IOException {
        Message message = null;
        Iterator<String> it = this.routingConfig.getIncoming().iterator();
        while (it.hasNext()) {
            try {
                message = receive(it.next());
                if (message != null) {
                    break;
                }
            } catch (InvalidMessageException e) {
                failInvalidMessage(e);
                return null;
            }
        }
        return message;
    }

    private void failInvalidMessage(InvalidMessageException invalidMessageException) throws IOException {
        Envelope envelope = invalidMessageException.getEnvelope();
        LOGGER.warn("Invalid message detected. Will be shifted into 'failed' queue: " + invalidMessageException.getMessage());
        this.rabbitClient.ack(envelope);
        String failedRoutingKey = this.routingConfig.getFailurePolicy(envelope.getSource()).getFailedRoutingKey();
        if (failedRoutingKey != null) {
            this.rabbitClient.sendRaw(this.routingConfig.getExchange(failedRoutingKey), failedRoutingKey, envelope.getBody().getBytes());
        }
    }

    private void provideInputQueues() throws IOException {
        for (String str : this.routingConfig.getIncoming()) {
            FailurePolicy failurePolicy = this.routingConfig.getFailurePolicy(str);
            String exchange = this.routingConfig.getExchange(str);
            String deadLetterExchange = this.routingConfig.getDeadLetterExchange(str);
            this.rabbitClient.declareQueue(str, exchange, str, Map.of(DEAD_LETTER_EXCHANGE, deadLetterExchange));
            if (failurePolicy.getRetryRoutingKey() != null) {
                this.rabbitClient.declareQueue(failurePolicy.getRetryRoutingKey(), deadLetterExchange, str, Map.of(MESSAGE_TTL, Long.valueOf(failurePolicy.getBackoff().toMillis()), DEAD_LETTER_EXCHANGE, exchange));
            }
            if (failurePolicy.getFailedRoutingKey() != null) {
                this.rabbitClient.declareQueue(failurePolicy.getFailedRoutingKey(), exchange, failurePolicy.getFailedRoutingKey(), null);
            }
        }
    }

    private void provideOutputQueues() throws IOException {
        for (String str : this.routingConfig.getOutgoing().values()) {
            this.rabbitClient.declareQueue(str, this.routingConfig.getExchange(str), str, Map.of(DEAD_LETTER_EXCHANGE, this.routingConfig.getDeadLetterExchange(str)));
        }
    }

    public void ack(Message message) throws IOException {
        this.rabbitClient.ack(message.getEnvelope());
    }

    public boolean reject(Message message) throws IOException {
        Envelope envelope = message.getEnvelope();
        long retries = this.routingConfig.getFailurePolicy(message).getRetries();
        ack(message);
        if (envelope.getRetries() >= retries) {
            fail(message, false);
            return false;
        }
        envelope.setRetries(envelope.getRetries() + 1);
        retry(message);
        return true;
    }

    void fail(Message message, boolean z) throws IOException {
        if (z) {
            ack(message);
        }
        LOGGER.debug("Send message to failed queue: " + message);
        String failedRoutingKey = this.routingConfig.getFailurePolicy(message).getFailedRoutingKey();
        if (failedRoutingKey != null) {
            send(failedRoutingKey, message);
        }
    }

    public void fail(Message message) throws IOException {
        fail(message, true);
    }

    private void retry(Message message) throws IOException {
        LOGGER.debug("Send message to retry queue: " + message);
        if (this.routingConfig.getFailurePolicy(message).getRetryRoutingKey() != null) {
            String source = message.getEnvelope().getSource();
            this.rabbitClient.send(this.routingConfig.getDeadLetterExchange(source), source, message);
        }
    }

    private void provideExchanges() throws IOException {
        Iterator<String> it = this.routingConfig.getExchanges().iterator();
        while (it.hasNext()) {
            this.rabbitClient.provideExchange(it.next());
        }
        Iterator<String> it2 = this.routingConfig.getDeadLetterExchanges().iterator();
        while (it2.hasNext()) {
            this.rabbitClient.provideExchange(it2.next());
        }
    }

    @Deprecated
    Map<String, Long> getMessageCounts() throws IOException {
        HashMap hashMap = new HashMap();
        for (String str : this.routingConfig.getIncoming()) {
            hashMap.put(str, this.rabbitClient.getMessageCount(str));
        }
        return hashMap;
    }

    Map<String, Long> getFailedMessageCounts() throws IOException {
        HashMap hashMap = new HashMap();
        Iterator<String> it = this.routingConfig.getIncoming().iterator();
        while (it.hasNext()) {
            FailurePolicy failurePolicy = this.routingConfig.getFailurePolicy(it.next());
            if (failurePolicy != null) {
                String failedRoutingKey = failurePolicy.getFailedRoutingKey();
                hashMap.put(failedRoutingKey, this.rabbitClient.getMessageCount(failedRoutingKey));
            }
        }
        return hashMap;
    }

    public Map<String, Long> getRetryMessageCounts() throws IOException {
        HashMap hashMap = new HashMap();
        Iterator<String> it = this.routingConfig.getIncoming().iterator();
        while (it.hasNext()) {
            FailurePolicy failurePolicy = this.routingConfig.getFailurePolicy(it.next());
            if (failurePolicy != null) {
                String retryRoutingKey = failurePolicy.getRetryRoutingKey();
                hashMap.put(retryRoutingKey, this.rabbitClient.getMessageCount(retryRoutingKey));
            }
        }
        return hashMap;
    }
}
