package org.apache.servicemix.eip.support;

import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentMap;
import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
import java.util.Date;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.messaging.RobustInOnly;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.eip.EIPEndpoint;
import org.apache.servicemix.jbi.util.MessageUtil;
import org.apache.servicemix.timers.Timer;
import org.apache.servicemix.timers.TimerListener;

/* loaded from: input_file:org/apache/servicemix/eip/support/AbstractAggregator.class */
public abstract class AbstractAggregator extends EIPEndpoint {
    private static final Log log;
    private ExchangeTarget target;
    private boolean rescheduleTimeouts;
    private boolean synchronous;
    private ConcurrentMap closedAggregates = new ConcurrentHashMap();
    static Class class$org$apache$servicemix$eip$support$AbstractAggregator;

    public boolean isSynchronous() {
        return this.synchronous;
    }

    public void setSynchronous(boolean z) {
        this.synchronous = z;
    }

    public boolean isRescheduleTimeouts() {
        return this.rescheduleTimeouts;
    }

    public void setRescheduleTimeouts(boolean z) {
        this.rescheduleTimeouts = z;
    }

    public ExchangeTarget getTarget() {
        return this.target;
    }

    public void setTarget(ExchangeTarget exchangeTarget) {
        this.target = exchangeTarget;
    }

    @Override // org.apache.servicemix.eip.EIPEndpoint
    protected void processSync(MessageExchange messageExchange) throws Exception {
        throw new IllegalStateException();
    }

    @Override // org.apache.servicemix.eip.EIPEndpoint
    protected void processAsync(MessageExchange messageExchange) throws Exception {
        throw new IllegalStateException();
    }

    @Override // org.apache.servicemix.eip.EIPEndpoint
    public void process(MessageExchange messageExchange) throws Exception {
        if (messageExchange.getStatus() == ExchangeStatus.DONE || messageExchange.getStatus() == ExchangeStatus.ERROR) {
            return;
        }
        if (messageExchange.getRole() != MessageExchange.Role.PROVIDER) {
            if (messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
                done(messageExchange);
                return;
            }
            return;
        }
        if (!(messageExchange instanceof InOnly) && !(messageExchange instanceof RobustInOnly)) {
            fail(messageExchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
            return;
        }
        NormalizedMessage copyIn = MessageUtil.copyIn(messageExchange);
        String correlationID = getCorrelationID(messageExchange, copyIn);
        if (correlationID == null || correlationID.length() == 0) {
            throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
        }
        Lock lock = getLockManager().getLock(correlationID);
        lock.lock();
        try {
            Object load = this.store.load(correlationID);
            Date date = null;
            if (load == null) {
                if (!isAggregationClosed(correlationID)) {
                    load = createAggregation(correlationID);
                    date = getTimeout(load);
                }
            } else if (isRescheduleTimeouts()) {
                date = getTimeout(load);
            }
            if (load != null) {
                if (addMessage(load, copyIn, messageExchange)) {
                    sendAggregate(correlationID, load, false);
                } else {
                    this.store.store(correlationID, load);
                    if (date != null) {
                        if (log.isDebugEnabled()) {
                            log.debug(new StringBuffer().append("Scheduling timeout at ").append(date).append(" for aggregate ").append(correlationID).toString());
                        }
                        getTimerManager().schedule(new TimerListener(this, correlationID) { // from class: org.apache.servicemix.eip.support.AbstractAggregator.1
                            private final String val$correlationId;
                            private final AbstractAggregator this$0;

                            {
                                this.this$0 = this;
                                this.val$correlationId = correlationID;
                            }

                            public void timerExpired(Timer timer) {
                                this.this$0.onTimeout(this.val$correlationId);
                            }
                        }, date);
                    }
                }
            }
            done(messageExchange);
            lock.unlock();
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    protected void sendAggregate(String str, Object obj, boolean z) throws Exception {
        MessageExchange createInOnlyExchange = getExchangeFactory().createInOnlyExchange();
        this.target.configureTarget(createInOnlyExchange, getContext());
        NormalizedMessage createMessage = createInOnlyExchange.createMessage();
        createInOnlyExchange.setInMessage(createMessage);
        buildAggregate(obj, createMessage, createInOnlyExchange, z);
        closeAggregation(str);
        if (isSynchronous()) {
            sendSync(createInOnlyExchange);
        } else {
            send(createInOnlyExchange);
        }
    }

    protected void onTimeout(String str) {
        if (log.isDebugEnabled()) {
            log.debug(new StringBuffer().append("Timeout expired for aggregate ").append(str).toString());
        }
        Lock lock = getLockManager().getLock(str);
        lock.lock();
        try {
            try {
                Object load = this.store.load(str);
                if (load != null) {
                    sendAggregate(str, load, true);
                } else {
                    if (!isAggregationClosed(str)) {
                        throw new IllegalStateException("Aggregation is not closed, but can not be retrieved from the store");
                    }
                    if (log.isDebugEnabled()) {
                        log.debug(new StringBuffer().append("Aggregate ").append(str).append(" is closed").toString());
                    }
                }
                lock.unlock();
            } catch (Exception e) {
                log.info("Caught exception while processing timeout aggregation", e);
                lock.unlock();
            }
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    protected boolean isAggregationClosed(String str) {
        return this.closedAggregates.containsKey(str);
    }

    protected void closeAggregation(String str) {
        this.closedAggregates.put(str, Boolean.TRUE);
    }

    protected abstract String getCorrelationID(MessageExchange messageExchange, NormalizedMessage normalizedMessage) throws Exception;

    protected abstract Object createAggregation(String str) throws Exception;

    protected abstract Date getTimeout(Object obj);

    protected abstract boolean addMessage(Object obj, NormalizedMessage normalizedMessage, MessageExchange messageExchange) throws Exception;

    protected abstract void buildAggregate(Object obj, NormalizedMessage normalizedMessage, MessageExchange messageExchange, boolean z) throws Exception;

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$apache$servicemix$eip$support$AbstractAggregator == null) {
            cls = class$("org.apache.servicemix.eip.support.AbstractAggregator");
            class$org$apache$servicemix$eip$support$AbstractAggregator = cls;
        } else {
            cls = class$org$apache$servicemix$eip$support$AbstractAggregator;
        }
        log = LogFactory.getLog(cls);
    }
}
