/*
 * Decompiled with CFR 0.152.
 */
package de.digitalcollections.workflow.engine.messagebroker;

import de.digitalcollections.workflow.engine.messagebroker.FailurePolicy;
import de.digitalcollections.workflow.engine.messagebroker.MessageBrokerConfig;
import de.digitalcollections.workflow.engine.messagebroker.RabbitClient;
import de.digitalcollections.workflow.engine.messagebroker.RoutingConfig;
import de.digitalcollections.workflow.engine.model.Envelope;
import de.digitalcollections.workflow.engine.model.Message;
import de.digitalcollections.workflow.engine.util.Maps;
import java.io.IOException;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageBroker {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageBroker.class);
    private static final String MESSAGE_TTL = "x-message-ttl";
    private static final String DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange";
    private MessageBrokerConfig config;
    private RoutingConfig routingConfig;
    private final RabbitClient rabbitClient;

    MessageBroker(MessageBrokerConfig config, RoutingConfig routingConfig, RabbitClient rabbitClient) throws IOException {
        this.config = config;
        this.routingConfig = routingConfig;
        this.rabbitClient = rabbitClient;
        this.provideExchanges();
        this.provideInputQueues();
        if (routingConfig.hasWriteTo()) {
            this.provideOutputQueue();
        }
    }

    public void send(Message message) throws IOException {
        this.rabbitClient.send(this.routingConfig.getExchange(), this.routingConfig.getWriteTo(), message);
    }

    public void send(String routingKey, Message message) throws IOException {
        this.rabbitClient.send(this.routingConfig.getExchange(), routingKey, message);
    }

    public void send(String routingKey, Collection<Message> messages) throws IOException {
        for (Message message : messages) {
            this.send(routingKey, message);
        }
    }

    public Message receive(String queueName) throws IOException {
        return this.rabbitClient.receive(queueName);
    }

    public Message receive() throws IOException {
        String inputQueue;
        Message message = null;
        String[] stringArray = this.routingConfig.getReadFrom();
        int n = stringArray.length;
        for (int i = 0; i < n && (message = this.receive(inputQueue = stringArray[i])) == null; ++i) {
        }
        return message;
    }

    private void provideInputQueues() throws IOException {
        String deadLetterExchange = this.routingConfig.getDeadLetterExchange();
        String exchange = this.routingConfig.getExchange();
        for (String inputQueue : this.routingConfig.getReadFrom()) {
            FailurePolicy failurePolicy = this.routingConfig.getFailurePolicy(inputQueue);
            this.rabbitClient.declareQueue(inputQueue, exchange, inputQueue, null);
            if (failurePolicy.getRetryRoutingKey() != null) {
                this.rabbitClient.declareQueue(failurePolicy.getRetryRoutingKey(), deadLetterExchange, inputQueue, Maps.of(MESSAGE_TTL, this.config.getDeadLetterWait(), DEAD_LETTER_EXCHANGE, exchange));
            }
            if (failurePolicy.getFailedRoutingKey() == null) continue;
            this.rabbitClient.declareQueue(failurePolicy.getFailedRoutingKey(), exchange, failurePolicy.getFailedRoutingKey(), null);
        }
    }

    private void provideOutputQueue() throws IOException {
        this.rabbitClient.declareQueue(this.routingConfig.getWriteTo(), this.routingConfig.getExchange(), this.routingConfig.getWriteTo(), Maps.of(DEAD_LETTER_EXCHANGE, this.routingConfig.getDeadLetterExchange()));
    }

    public void ack(Message message) throws IOException {
        this.rabbitClient.ack(message);
    }

    public void reject(Message message) throws IOException {
        Envelope envelope = message.getEnvelope();
        this.ack(message);
        if (envelope.getRetries() < this.config.getMaxRetries()) {
            envelope.setRetries(envelope.getRetries() + 1);
            this.retry(message);
        } else {
            this.fail(message);
        }
    }

    private void fail(Message message) throws IOException {
        LOGGER.debug("Send message to failed queue: " + message);
        FailurePolicy failurePolicy = this.routingConfig.getFailurePolicy(message);
        String failedRoutingKey = failurePolicy.getFailedRoutingKey();
        if (failedRoutingKey != null) {
            this.send(failedRoutingKey, message);
        }
    }

    private void retry(Message message) throws IOException {
        LOGGER.debug("Send message to retry queue: " + message);
        FailurePolicy failurePolicy = this.routingConfig.getFailurePolicy(message);
        String retryRoutingKey = failurePolicy.getRetryRoutingKey();
        if (retryRoutingKey != null) {
            this.rabbitClient.send(this.routingConfig.getDeadLetterExchange(), message.getEnvelope().getSource(), message);
        }
    }

    private void provideExchanges() throws IOException {
        this.rabbitClient.provideExchange(this.routingConfig.getExchange());
        this.rabbitClient.provideExchange(this.routingConfig.getDeadLetterExchange());
    }

    public MessageBrokerConfig getConfig() {
        return this.config;
    }
}

