package io.elastic.sailor.impl;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import io.elastic.api.Function;
import io.elastic.sailor.AmqpService;
import io.elastic.sailor.ContainerContext;
import io.elastic.sailor.MessageProcessor;
import io.elastic.sailor.Step;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/elastic/sailor/impl/AmqpServiceImpl.class */
public class AmqpServiceImpl implements AmqpService {
    private static final Logger logger = LoggerFactory.getLogger(AmqpServiceImpl.class);
    private Connection amqp;
    private Channel subscribeChannel;
    private Channel publishChannel;
    private String amqpUri;
    private String subscribeExchangeName;
    private Integer prefetchCount;
    private CryptoServiceImpl cipher;
    private MessageProcessor messageProcessor;
    private Step step;
    private String consumerTag;
    private ContainerContext containerContext;

    @Inject
    public AmqpServiceImpl(CryptoServiceImpl cryptoServiceImpl) {
        this.cipher = cryptoServiceImpl;
    }

    @Inject
    public void setAmqpUri(@Named("ELASTICIO_AMQP_URI") String str) {
        this.amqpUri = str;
    }

    @Inject
    public void setSubscribeExchangeName(@Named("ELASTICIO_LISTEN_MESSAGES_ON") String str) {
        this.subscribeExchangeName = str;
    }

    @Inject
    public void setMessageProcessor(MessageProcessor messageProcessor) {
        this.messageProcessor = messageProcessor;
    }

    @Inject
    public void setPrefetchCount(@Named("ELASTICIO_RABBITMQ_PREFETCH_SAILOR") Integer num) {
        this.prefetchCount = num;
    }

    @Inject
    public void setStep(@Named("StepJson") Step step) {
        this.step = step;
    }

    @Inject
    public void setContainerContext(ContainerContext containerContext) {
        this.containerContext = containerContext;
    }

    @Override // io.elastic.sailor.AmqpService
    public void connectAndSubscribe() {
        openConnection();
        openSubscribeChannel();
    }

    @Override // io.elastic.sailor.AmqpService
    public void disconnect() {
        logger.info("About to disconnect from AMQP");
        try {
            this.subscribeChannel.close();
        } catch (IOException | TimeoutException e) {
            logger.info("Subscription channel is already closed: " + e);
        }
        try {
            this.publishChannel.close();
        } catch (IOException | TimeoutException e2) {
            logger.info("Publish channel is already closed: " + e2);
        }
        try {
            this.amqp.close();
        } catch (IOException e3) {
            logger.info("AMQP connection is already closed: " + e3);
        }
        logger.info("Successfully disconnected from AMQP");
    }

    @Override // io.elastic.sailor.AmqpService
    public void subscribeConsumer(Function function) {
        try {
            this.consumerTag = this.subscribeChannel.basicConsume(this.subscribeExchangeName, new MessageConsumer(this.subscribeChannel, this.cipher, this.messageProcessor, function, this.step, this.containerContext));
            logger.info("Subscribed consumer {}. Waiting for messages to arrive ...", this.consumerTag);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // io.elastic.sailor.AmqpService
    public void cancelConsumer() {
        if (this.consumerTag != null) {
            logger.info("Canceling consumer {}", this.consumerTag);
            try {
                this.subscribeChannel.basicCancel(this.consumerTag);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        this.consumerTag = null;
    }

    @Override // io.elastic.sailor.AmqpService
    public void ack(Long l) {
        try {
            logger.info(String.format("Message #%s ack", l));
            this.subscribeChannel.basicAck(l.longValue(), false);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // io.elastic.sailor.AmqpService
    public void reject(Long l) {
        try {
            logger.info(String.format("Message #%s reject", l));
            this.subscribeChannel.basicReject(l.longValue(), false);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private AmqpServiceImpl openConnection() {
        try {
            if (this.amqp == null) {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setUri(new URI(this.amqpUri));
                this.amqp = connectionFactory.newConnection();
                logger.info("Connected to AMQP");
            }
            return this;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private AmqpServiceImpl openSubscribeChannel() {
        try {
            if (this.subscribeChannel == null) {
                this.subscribeChannel = this.amqp.createChannel();
                this.subscribeChannel.basicQos(this.prefetchCount.intValue());
                logger.info("Opened subscribe channel");
            }
            return this;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void setSubscribeChannel(Channel channel) {
        this.subscribeChannel = channel;
    }

    public void setPublishChannel(Channel channel) {
        this.publishChannel = channel;
    }

    @Override // io.elastic.sailor.AmqpService
    public Connection getConnection() {
        return this.amqp;
    }
}
