package org.apache.synapse.transport.amqp.pollingtask;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.om.OMDocument;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axiom.soap.SOAPFactory;
import org.apache.axiom.soap.SOAPFault;
import org.apache.axiom.soap.SOAPFaultCode;
import org.apache.axiom.soap.SOAPFaultReason;
import org.apache.axis2.AxisFault;
import org.apache.axis2.builder.BuilderUtil;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.transport.TransportUtils;
import org.apache.axis2.transport.http.HTTPTransportUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.transport.amqp.AMQPOutTransportInfo;
import org.apache.synapse.transport.amqp.AMQPTransportBuffers;
import org.apache.synapse.transport.amqp.AMQPTransportConstant;
import org.apache.synapse.transport.amqp.AMQPTransportEndpoint;
import org.apache.synapse.transport.amqp.AMQPTransportException;
import org.apache.synapse.transport.amqp.AMQPTransportMessage;
import org.apache.synapse.transport.amqp.ha.AMQPTransportHABrokerEntry;
import org.apache.synapse.transport.amqp.ha.AMQPTransportHAEntry;
import org.apache.synapse.transport.amqp.ha.AMQPTransportReconnectHandler;

/* loaded from: input_file:org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.class */
public class AMQPTransportPollingTask {
    private static Log log = LogFactory.getLog(AMQPTransportPollingTask.class);
    private String serviceName;
    private String connectionFactoryName;
    private Channel channel;
    private AMQPTransportReconnectHandler haHandler;
    private String exchangeName = null;
    private boolean isExchangeDurable = false;
    private boolean isExchangeAutoDelete = true;
    private String exchangeType = "direct";
    private boolean isInternalExchange = false;
    private String consumerExchangeName = null;
    private String[] bindingsKeys = null;
    private boolean isUseTx = false;
    private String queueName = null;
    private boolean isQueueDurable = false;
    private boolean isQueueRestricted = false;
    private boolean isQueueAutoDelete = true;
    private boolean isBlockingMode = false;
    private int noOfConcurrentConsumers = 2;
    private String responseConnectionFactory = null;
    private long scheduledTaskInitialDelay = 0;
    private long scheduledTaskDelay = 1;
    private TimeUnit scheduledTaskTimeUnit = TimeUnit.MILLISECONDS;
    private int noOfDispatchingTask = 2;
    private ScheduledExecutorService pollingTaskScheduler = null;
    private AMQPTransportEndpoint endpoint = null;
    private AMQPTransportBuffers buffers = null;
    private String configuredContentType = AMQPTransportConstant.DEFAULT_CONTENT_TYPE;
    private List<ScheduledFuture<?>> taskFutureList = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask$MessageDispatchTask.class */
    public final class MessageDispatchTask implements Runnable {
        private AMQPTransportBuffers buffers;

        private MessageDispatchTask(AMQPTransportBuffers aMQPTransportBuffers) {
            this.buffers = aMQPTransportBuffers;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                AMQPTransportMessage requestMessage = this.buffers.getRequestMessage();
                if (requestMessage != null) {
                    AMQPTransportPollingTask.this.pollingTaskScheduler.execute(new MessageProcessingTask(requestMessage, this.buffers));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask$MessageIOTask.class */
    public final class MessageIOTask implements Runnable {
        private AMQPTransportBuffers buffers;
        private QueueingConsumer queueingConsumer;
        private boolean isUseTx;

        private MessageIOTask(QueueingConsumer queueingConsumer, AMQPTransportBuffers aMQPTransportBuffers, boolean z) {
            this.queueingConsumer = queueingConsumer;
            this.buffers = aMQPTransportBuffers;
            this.isUseTx = z;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (this.isUseTx) {
                    AMQPTransportPollingTask.this.channel.txSelect();
                }
                QueueingConsumer.Delivery nextDelivery = this.queueingConsumer.nextDelivery();
                if (nextDelivery != null) {
                    this.buffers.addRequestMessage(new AMQPTransportMessage(nextDelivery));
                    if (this.isUseTx) {
                        AMQPTransportPollingTask.this.channel.basicAck(nextDelivery.getEnvelope().getDeliveryTag(), false);
                        AMQPTransportPollingTask.this.channel.txCommit();
                    }
                } else if (this.isUseTx) {
                    AMQPTransportPollingTask.this.channel.txRollback();
                }
            } catch (InterruptedException e) {
                AMQPTransportPollingTask.log.error("Polling task was interrupted for service '" + AMQPTransportPollingTask.this.serviceName + "'", e);
                Thread.currentThread().interrupt();
            } catch (ShutdownSignalException e2) {
                if (e2.isHardError()) {
                    AMQPTransportPollingTask.log.error("Polling task for service '" + AMQPTransportPollingTask.this.serviceName + "' received a shutdown signal", e2);
                    Semaphore semaphore = new Semaphore(0, true);
                    String uuid = UUID.randomUUID().toString();
                    AMQPTransportPollingTask.this.haHandler.getBlockedTasks().add(new AMQPTransportHAEntry(semaphore, uuid, AMQPTransportPollingTask.this.connectionFactoryName));
                    try {
                        semaphore.acquire();
                        AMQPTransportHABrokerEntry aMQPTransportHABrokerEntry = AMQPTransportPollingTask.this.haHandler.getConnectionMap().get(uuid);
                        if (aMQPTransportHABrokerEntry == null) {
                            AMQPTransportPollingTask.log.error("No new connection factory was found for key '" + uuid + "'");
                            return;
                        }
                        AMQPTransportPollingTask.this.setChannel(aMQPTransportHABrokerEntry.getChannel());
                        AMQPTransportPollingTask.this.stop();
                        try {
                            AMQPTransportPollingTask.this.start();
                            AMQPTransportPollingTask.log.info("Worker task for service '" + AMQPTransportPollingTask.this.serviceName + "' is re-deployed");
                        } catch (AMQPTransportException e3) {
                            AMQPTransportPollingTask.log.error("Start of polling tasks failed. System must be restarted!");
                        }
                    } catch (InterruptedException e4) {
                        AMQPTransportPollingTask.log.error("The blocking semaphore was interrupted", e2);
                        Thread.currentThread().interrupt();
                    }
                }
            } catch (IOException e5) {
                AMQPTransportPollingTask.log.error("I/O error occurs for the polling tasks for service '" + AMQPTransportPollingTask.this.serviceName + "'", e5);
            } catch (ConsumerCancelledException e6) {
                AMQPTransportPollingTask.log.error("Polling task for service '" + AMQPTransportPollingTask.this.serviceName + "' received a cancellation signal", e6);
            }
        }
    }

    /* loaded from: input_file:org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask$MessageProcessingTask.class */
    private final class MessageProcessingTask implements Runnable {
        private AMQPTransportMessage message;
        private AMQPTransportBuffers buffers;
        private boolean isSOAP11;

        private MessageProcessingTask(AMQPTransportMessage aMQPTransportMessage, AMQPTransportBuffers aMQPTransportBuffers) {
            this.message = aMQPTransportMessage;
            this.buffers = aMQPTransportBuffers;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                handleIncomingMessage(this.message, this.buffers);
            } catch (AxisFault e) {
                try {
                    handleFaultMessage(this.message, this.buffers, e);
                } catch (Exception e2) {
                    AMQPTransportPollingTask.log.error("Error while sending the fault message to the client. Client will not receive any errors!", e2);
                }
            }
        }

        private boolean handleIncomingMessage(AMQPTransportMessage aMQPTransportMessage, AMQPTransportBuffers aMQPTransportBuffers) throws AxisFault {
            if (aMQPTransportMessage == null) {
                throw new AxisFault("A null message received!");
            }
            try {
                MessageContext createMessageContext = AMQPTransportPollingTask.this.endpoint.createMessageContext();
                createMessageContext.setMessageID(aMQPTransportMessage.getMessageId());
                createMessageContext.setProperty("AMQP_CORRELATION_ID", aMQPTransportMessage.getCorrelationId());
                createMessageContext.setProperty(AMQPTransportConstant.AMQP_TRANSPORT_BUFFER_KEY, aMQPTransportBuffers);
                String contentType = aMQPTransportMessage.getContentType();
                if (contentType == null) {
                    contentType = AMQPTransportPollingTask.this.configuredContentType;
                }
                Map<String, Object> headers = aMQPTransportMessage.getHeaders();
                if (aMQPTransportMessage.getReplyTo() != null) {
                    createMessageContext.setProperty("OutTransportInfo", new AMQPOutTransportInfo(contentType, AMQPTransportPollingTask.this.responseConnectionFactory, aMQPTransportMessage.getReplyTo()));
                    createMessageContext.setProperty(AMQPTransportConstant.PROPERTY_AMQP_REPLY_TO, aMQPTransportMessage.getReplyTo());
                    createMessageContext.setProperty(AMQPTransportConstant.RESPONSE_CONNECTION_FACTORY_NAME, AMQPTransportPollingTask.this.responseConnectionFactory);
                }
                HTTPTransportUtils.initializeMessageContext(createMessageContext, aMQPTransportMessage.getSoapAction(), (String) null, contentType);
                ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(aMQPTransportMessage.getBody());
                createMessageContext.setProperty("ContentType", contentType);
                createMessageContext.setProperty("TRANSPORT_HEADERS", headers);
                createMessageContext.setEnvelope(TransportUtils.createSOAPEnvelope(BuilderUtil.getBuilderFromSelector(contentType, createMessageContext).processDocument(HTTPTransportUtils.handleGZip(createMessageContext, byteArrayInputStream), contentType, createMessageContext)));
                this.isSOAP11 = createMessageContext.isSOAP11();
                AxisEngine.receive(createMessageContext);
                return true;
            } catch (IOException e) {
                throw new AxisFault(e.getMessage(), e);
            }
        }

        private void handleFaultMessage(AMQPTransportMessage aMQPTransportMessage, AMQPTransportBuffers aMQPTransportBuffers, AxisFault axisFault) throws Exception {
            SOAPFactory sOAP11Factory = this.isSOAP11 ? OMAbstractFactory.getSOAP11Factory() : OMAbstractFactory.getSOAP12Factory();
            OMDocument createOMDocument = sOAP11Factory.createOMDocument();
            SOAPEnvelope defaultFaultEnvelope = sOAP11Factory.getDefaultFaultEnvelope();
            createOMDocument.addChild(defaultFaultEnvelope);
            SOAPFault fault = defaultFaultEnvelope.getBody().getFault();
            if (fault == null) {
                fault = sOAP11Factory.createSOAPFault();
            }
            SOAPFaultCode createSOAPFaultCode = sOAP11Factory.createSOAPFaultCode();
            createSOAPFaultCode.setText(axisFault.getMessage());
            fault.setCode(createSOAPFaultCode);
            SOAPFaultReason createSOAPFaultReason = sOAP11Factory.createSOAPFaultReason();
            createSOAPFaultReason.setText(axisFault.getMessage());
            fault.setReason(createSOAPFaultReason);
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            defaultFaultEnvelope.serialize(byteArrayOutputStream);
            try {
                aMQPTransportBuffers.addResponseMessage(new AMQPTransportMessage(new AMQP.BasicProperties(), byteArrayOutputStream.toByteArray()));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /* loaded from: input_file:org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask$TASK_STATE.class */
    private enum TASK_STATE {
        STOPPED,
        STARTED,
        FAILURE
    }

    public void setUseTx(boolean z) {
        this.isUseTx = z;
    }

    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    public void setConfiguredContentType(String str) {
        this.configuredContentType = str;
    }

    public void setBuffers(AMQPTransportBuffers aMQPTransportBuffers) {
        this.buffers = aMQPTransportBuffers;
    }

    public void setEndpoint(AMQPTransportEndpoint aMQPTransportEndpoint) {
        this.endpoint = aMQPTransportEndpoint;
    }

    public void setServiceName(String str) {
        this.serviceName = str;
    }

    public void setExchangeName(String str) {
        this.exchangeName = str;
    }

    public void setExchangeDurable(boolean z) {
        this.isExchangeDurable = z;
    }

    public void setExchangeAutoDelete(boolean z) {
        this.isExchangeAutoDelete = z;
    }

    public void setExchangeType(String str) {
        this.exchangeType = str;
    }

    public void setInternalExchange(boolean z) {
        this.isInternalExchange = z;
    }

    public void setConsumerExchangeName(String str) {
        this.consumerExchangeName = str;
    }

    public void setBindingsKeys(String[] strArr) {
        this.bindingsKeys = strArr;
    }

    public void setQueueName(String str) {
        this.queueName = str;
    }

    public void setQueueDurable(boolean z) {
        this.isQueueDurable = z;
    }

    public void setQueueRestricted(boolean z) {
        this.isQueueRestricted = z;
    }

    public void setQueueAutoDelete(boolean z) {
        this.isQueueAutoDelete = z;
    }

    public void setBlockingMode(boolean z) {
        this.isBlockingMode = z;
    }

    public void setNoOfConcurrentConsumers(int i) {
        this.noOfConcurrentConsumers = i;
    }

    public void setConnectionFactoryName(String str) {
        this.connectionFactoryName = str;
    }

    public void setScheduledTaskInitialDelay(int i) {
        this.scheduledTaskInitialDelay = i;
    }

    public void setScheduledTaskDelay(int i) {
        this.scheduledTaskDelay = i;
    }

    public void setScheduledTaskTimeUnit(TimeUnit timeUnit) {
        this.scheduledTaskTimeUnit = timeUnit;
    }

    public void setNoOfDispatchingTask(int i) {
        this.noOfDispatchingTask = i;
    }

    public void setPollingTaskScheduler(ScheduledExecutorService scheduledExecutorService) {
        this.pollingTaskScheduler = scheduledExecutorService;
    }

    public String getServiceName() {
        return this.serviceName;
    }

    public String getExchangeName() {
        return this.exchangeName;
    }

    public boolean isExchangeDurable() {
        return this.isExchangeDurable;
    }

    public boolean isExchangeAutoDelete() {
        return this.isExchangeAutoDelete;
    }

    public String getExchangeType() {
        return this.exchangeType;
    }

    public boolean isInternalExchange() {
        return this.isInternalExchange;
    }

    public String getConsumerExchangeName() {
        return this.consumerExchangeName;
    }

    public String[] getBindingsKeys() {
        return this.bindingsKeys;
    }

    public String getQueueName() {
        return this.queueName;
    }

    public boolean isQueueDurable() {
        return this.isQueueDurable;
    }

    public boolean isQueueRestricted() {
        return this.isQueueRestricted;
    }

    public boolean isQueueAutoDelete() {
        return this.isQueueAutoDelete;
    }

    public boolean isBlockingMode() {
        return this.isBlockingMode;
    }

    public int getNoOfConcurrentConsumers() {
        return this.noOfConcurrentConsumers;
    }

    public TimeUnit getScheduledTaskTimeUnit() {
        return this.scheduledTaskTimeUnit;
    }

    public int getNoOfDispatchingTask() {
        return this.noOfDispatchingTask;
    }

    public ExecutorService getPollingTaskScheduler() {
        return this.pollingTaskScheduler;
    }

    public AMQPTransportEndpoint getEndpoint() {
        return this.endpoint;
    }

    public void setResponseConnectionFactory(String str) {
        this.responseConnectionFactory = str;
    }

    public void setHaHandler(AMQPTransportReconnectHandler aMQPTransportReconnectHandler) {
        this.haHandler = aMQPTransportReconnectHandler;
    }

    public synchronized void start() throws AMQPTransportException {
        try {
            if (this.exchangeName != null) {
                this.channel.exchangeDeclare(this.exchangeName, this.exchangeType, this.isExchangeDurable, this.isExchangeAutoDelete, this.isInternalExchange, (Map) null);
                String queue = this.channel.queueDeclare().getQueue();
                log.info("QueueName is set to '" + queue + "' for service '" + this.serviceName + "'");
                this.queueName = queue;
                if (this.bindingsKeys != null) {
                    for (String str : this.bindingsKeys) {
                        this.channel.queueBind(this.queueName, this.exchangeName, str);
                    }
                } else {
                    this.channel.queueBind(this.queueName, this.exchangeName, "");
                }
            } else {
                this.channel.queueDeclare(this.queueName, this.isQueueDurable, this.isQueueRestricted, this.isQueueAutoDelete, (Map) null);
            }
        } catch (IOException e) {
            handleException(e.getMessage(), e);
        }
        for (int i = 0; i < this.noOfDispatchingTask; i++) {
            this.pollingTaskScheduler.execute(new MessageDispatchTask(this.buffers));
        }
        for (int i2 = 0; i2 < this.noOfConcurrentConsumers; i2++) {
            try {
                QueueingConsumer queueingConsumer = new QueueingConsumer(this.channel);
                this.channel.basicConsume(this.queueName, !this.isUseTx, queueingConsumer);
                this.taskFutureList.add(this.pollingTaskScheduler.scheduleWithFixedDelay(new MessageIOTask(queueingConsumer, this.buffers, this.isUseTx), this.scheduledTaskInitialDelay, this.scheduledTaskDelay, this.scheduledTaskTimeUnit));
            } catch (IOException e2) {
                handleException(e2.getMessage(), e2);
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("A polling task started listening on the queue '" + this.queueName + "' on behalf of the service '" + this.serviceName + "'");
        }
    }

    public synchronized void stop() {
        Iterator<ScheduledFuture<?>> it = this.taskFutureList.iterator();
        while (it.hasNext()) {
            it.next().cancel(false);
        }
    }

    private void handleException(String str, Throwable th) throws AMQPTransportException {
        log.error(str, th);
        throw new AMQPTransportException(str, th);
    }
}
