package org.apache.synapse.transport.amqp;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.axis2.AxisFault;
import org.apache.axis2.builder.BuilderUtil;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.transport.OutTransportInfo;
import org.apache.axis2.transport.base.AbstractTransportSender;
import org.apache.axis2.util.MessageContextBuilder;
import org.apache.synapse.transport.amqp.connectionfactory.AMQPTransportConnectionFactoryManager;
import org.apache.synapse.transport.amqp.pollingtask.AMQPSimpleConsumerTask;
import org.apache.synapse.transport.amqp.sendertask.AMQPSender;
import org.apache.synapse.transport.amqp.sendertask.AMQPSenderCache;
import org.apache.synapse.transport.amqp.sendertask.AMQPSenderFactory;
import org.apache.synapse.transport.amqp.tx.AMQPTransportProducerTx;

/* loaded from: input_file:org/apache/synapse/transport/amqp/AMQPTransportSender.class */
public class AMQPTransportSender extends AbstractTransportSender {
    private AMQPTransportConnectionFactoryManager connectionFactoryManager;
    private ExecutorService connectionFactoryES;
    private AMQPSenderCache cache;
    private Map<String, Semaphore> responseTracker;
    private Map<String, AMQPTransportMessage> responseMessage;
    private ExecutorService responseHandlingPool;
    private long semaphoreTimeOut;

    public void init(ConfigurationContext configurationContext, TransportOutDescription transportOutDescription) throws AxisFault {
        super.init(configurationContext, transportOutDescription);
        this.connectionFactoryES = Executors.newFixedThreadPool(AMQPTransportUtils.getIntProperty(AMQPTransportConstant.PARAM_CONNECTION_FACTORY_POOL_SIZE, 20));
        this.responseHandlingPool = Executors.newFixedThreadPool(AMQPTransportUtils.getIntProperty(AMQPTransportConstant.PARAM_RESPONSE_HANDLING_POOL_SIZE, 20));
        this.connectionFactoryManager = new AMQPTransportConnectionFactoryManager();
        this.connectionFactoryManager.addConnectionFactories(transportOutDescription, this.connectionFactoryES);
        this.semaphoreTimeOut = AMQPTransportUtils.getLongProperty(AMQPTransportConstant.PARAM_SEMAPHORE_TIME_OUT, 86400L);
        this.cache = new AMQPSenderCache(new ConcurrentHashMap());
        this.responseTracker = new ConcurrentHashMap();
        this.responseMessage = new ConcurrentHashMap();
        this.log.info("AMQP transport sender initializing..");
    }

    public void stop() {
        super.stop();
        try {
            this.connectionFactoryManager.shutDownConnectionFactories();
        } catch (AMQPTransportException e) {
            this.log.error("Error while shutting down connection factories, continue anyway...", e);
        }
        this.cache.clean();
        this.responseTracker.clear();
        this.responseMessage.clear();
        this.connectionFactoryES.shutdown();
        this.responseHandlingPool.shutdown();
    }

    public void sendMessage(MessageContext messageContext, String str, OutTransportInfo outTransportInfo) throws AxisFault {
        Map<String, String> params;
        AMQPSender createAMQPSender;
        Integer num = null;
        String str2 = null;
        MessageContext messageContext2 = messageContext.getOperationContext().getMessageContext("In");
        if (messageContext2 != null) {
            str2 = (String) messageContext2.getProperty(AMQPTransportConstant.PROPERTY_AMQP_REPLY_TO);
        }
        if (str2 != null) {
            num = Integer.valueOf(str2.hashCode());
            params = new HashMap();
            params.put(AMQPTransportConstant.PARAMETER_QUEUE_NAME, str2);
            String str3 = (String) messageContext.getOperationContext().getMessageContext("In").getProperty(AMQPTransportConstant.RESPONSE_CONNECTION_FACTORY_NAME);
            if (str3 == null) {
                throw new AxisFault("A message was received with 'reply to' set. But no reply connection factory name found. Define the parameter 'transport.amqp.ResponseConnectionFactoryName' as a service parameter. This response message will be dropped!");
            }
            params.put(AMQPTransportConstant.PARAMETER_CONNECTION_FACTORY_NAME, str3);
        } else if (str != null) {
            num = new Integer(str.hashCode());
            try {
                params = AMQPTransportUtils.parseAMQPUri(str);
            } catch (AMQPTransportException e) {
                throw new AxisFault("Error while parsing the AMQP epr '" + str + "'", e);
            }
        } else {
            if (outTransportInfo == null || !(outTransportInfo instanceof AMQPOutTransportInfo)) {
                throw new AxisFault("Could not determine the endpoint information to deliver the message");
            }
            params = ((AMQPOutTransportInfo) outTransportInfo).getParams();
        }
        if (this.cache.hit(num)) {
            createAMQPSender = this.cache.get(num);
        } else {
            try {
                createAMQPSender = AMQPSenderFactory.createAMQPSender(this.connectionFactoryManager, params);
                this.cache.add(num, createAMQPSender);
            } catch (IOException e2) {
                throw new AxisFault("Could not create the AMQP sender", e2);
            }
        }
        try {
            String str4 = (String) messageContext.getProperty("AMQP_CORRELATION_ID");
            if (str4 == null) {
                str4 = messageContext.getMessageID();
            }
            boolean waitForSynchronousResponse = waitForSynchronousResponse(messageContext);
            Semaphore semaphore = null;
            if (waitForSynchronousResponse) {
                str2 = (String) messageContext.getProperty(AMQPTransportConstant.PROPERTY_AMQP_REPLY_TO);
                if (str2 == null) {
                    str2 = UUID.randomUUID().toString();
                }
                semaphore = new Semaphore(0, true);
                this.responseTracker.put(str4, semaphore);
            }
            String str5 = (String) messageContext.getProperty(AMQPTransportConstant.PROPERTY_PRODUCER_TX);
            AMQPTransportProducerTx aMQPTransportProducerTx = AMQPTransportConstant.AMQP_USE_LWPC.equals(str5) ? new AMQPTransportProducerTx(true, createAMQPSender.getChannel()) : AMQPTransportConstant.AMQP_USE_TX.equals(str5) ? new AMQPTransportProducerTx(false, createAMQPSender.getChannel()) : null;
            if (aMQPTransportProducerTx != null) {
                try {
                    aMQPTransportProducerTx.start();
                } catch (IOException e3) {
                    throw new AxisFault("Error while initiation tx for message '" + messageContext.getMessageID() + "'", e3);
                }
            }
            createAMQPSender.sendAMQPMessage(messageContext, str4, str2);
            if (aMQPTransportProducerTx != null) {
                try {
                    aMQPTransportProducerTx.end();
                } catch (IOException e4) {
                    throw new AxisFault("Error while terminating tx for message '" + messageContext.getMessageID() + "'", e4);
                } catch (InterruptedException e5) {
                    this.log.error("Error while terminating tx for message '" + messageContext.getMessageID() + "'", e5);
                    Thread.currentThread().interrupt();
                }
            }
            if (waitForSynchronousResponse) {
                new AMQPSimpleConsumerTask(this.responseHandlingPool, createAMQPSender.getChannel(), str2, this.responseTracker, this.responseMessage).consume();
                try {
                    semaphore.tryAcquire(this.semaphoreTimeOut, TimeUnit.SECONDS);
                } catch (InterruptedException e6) {
                    Thread.currentThread().interrupt();
                }
                this.responseTracker.remove(str4);
                AMQPTransportMessage aMQPTransportMessage = this.responseMessage.get(str4);
                if (aMQPTransportMessage != null) {
                    handleSyncResponse(messageContext, aMQPTransportMessage, aMQPTransportMessage.getContentType());
                } else {
                    this.log.warn("The semaphore with id '" + str4 + "' was time out while waiting for a response, sending a fault to client..");
                    sendFault(messageContext, new Exception("Times out occurs while waiting for a response"));
                }
            }
        } catch (IOException e7) {
            throw new AxisFault("Could not produce the message into the destination", e7);
        } catch (AMQPTransportException e8) {
            throw new AxisFault("Could not retrieve the connection factory information", e8);
        }
    }

    private void handleSyncResponse(MessageContext messageContext, AMQPTransportMessage aMQPTransportMessage, String str) throws AxisFault {
        try {
            MessageContext createResponseMessageContext = createResponseMessageContext(messageContext);
            createResponseMessageContext.setProperty("messageType", messageContext.getProperty("messageType"));
            createResponseMessageContext.setProperty("ContentType", messageContext.getProperty("ContentType"));
            String contentType = aMQPTransportMessage.getContentType();
            if (contentType == null) {
                contentType = inferContentType(str, createResponseMessageContext);
            }
            createResponseMessageContext.setEnvelope(BuilderUtil.getBuilderFromSelector(contentType, messageContext).processDocument(new ByteArrayInputStream(aMQPTransportMessage.getBody()), contentType, createResponseMessageContext));
            String charSetEncoding = BuilderUtil.getCharSetEncoding(contentType);
            if (charSetEncoding == null) {
                charSetEncoding = "UTF-8";
            }
            createResponseMessageContext.setProperty("CHARACTER_SET_ENCODING", contentType.indexOf("; charset=") > 0 ? charSetEncoding : "UTF-8");
            createResponseMessageContext.setProperty("TRANSPORT_HEADERS", aMQPTransportMessage.getHeaders());
            if (aMQPTransportMessage.getSoapAction() != null) {
                createResponseMessageContext.setSoapAction(aMQPTransportMessage.getSoapAction());
            }
            AxisEngine.receive(createResponseMessageContext);
        } catch (AxisFault e) {
            handleException("Could not handle the response message ", e);
        }
    }

    private void sendFault(MessageContext messageContext, Exception exc) {
        try {
            MessageContext createFaultMessageContext = MessageContextBuilder.createFaultMessageContext(messageContext, exc);
            createFaultMessageContext.setProperty("ERROR_MESSAGE", exc.getMessage());
            createFaultMessageContext.setProperty("SENDING_FAULT", Boolean.TRUE);
            AxisEngine.sendFault(createFaultMessageContext);
        } catch (AxisFault e) {
            this.log.fatal("Could not create the fault message.", e);
        }
    }

    private String inferContentType(String str, MessageContext messageContext) {
        Object property = messageContext.getProperty("ContentType");
        if (property != null) {
            return property.toString();
        }
        Parameter parameter = this.cfgCtx.getAxisConfiguration().getParameter("ContentType");
        return parameter != null ? parameter.getValue().toString() : str != null ? str : AMQPTransportConstant.DEFAULT_CONTENT_TYPE;
    }
}
