package org.apache.camel.processor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.TracedRouteNodes;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.EventHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.KeyValueHolder;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.StopWatch;
import org.apache.camel.util.concurrent.AtomicException;
import org.apache.camel.util.concurrent.AtomicExchange;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.camel.util.concurrent.SubmitOrderedCompletionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/camel-core-2.7.4.jar:org/apache/camel/processor/MulticastProcessor.class */
public class MulticastProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable {
    private static final transient Logger LOG = LoggerFactory.getLogger(MulticastProcessor.class);
    private final CamelContext camelContext;
    private Collection<Processor> processors;
    private final AggregationStrategy aggregationStrategy;
    private final boolean parallelProcessing;
    private final boolean streaming;
    private final boolean stopOnException;
    private final ExecutorService executorService;
    private ExecutorService aggregateExecutorService;
    private final long timeout;
    private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/camel-core-2.7.4.jar:org/apache/camel/processor/MulticastProcessor$AggregateOnTheFlyTask.class */
    public final class AggregateOnTheFlyTask implements Runnable {
        private final AtomicExchange result;
        private final Exchange original;
        private final AtomicInteger total;
        private final CompletionService<Exchange> completion;
        private final AtomicBoolean running;
        private final CountDownLatch aggregationOnTheFlyDone;
        private final AtomicBoolean allTasksSubmitted;
        private final AtomicException executionException;

        private AggregateOnTheFlyTask(AtomicExchange atomicExchange, Exchange exchange, AtomicInteger atomicInteger, CompletionService<Exchange> completionService, AtomicBoolean atomicBoolean, CountDownLatch countDownLatch, AtomicBoolean atomicBoolean2, AtomicException atomicException) {
            this.result = atomicExchange;
            this.original = exchange;
            this.total = atomicInteger;
            this.completion = completionService;
            this.running = atomicBoolean;
            this.aggregationOnTheFlyDone = countDownLatch;
            this.allTasksSubmitted = atomicBoolean2;
            this.executionException = atomicException;
        }

        @Override // java.lang.Runnable
        public void run() {
            MulticastProcessor.LOG.trace("Aggregate on the fly task +++ started +++");
            try {
                try {
                    aggregateOnTheFly();
                    MulticastProcessor.LOG.debug("Signaling we are done aggregating on the fly");
                    MulticastProcessor.LOG.trace("Aggregate on the fly task +++ done +++");
                    this.aggregationOnTheFlyDone.countDown();
                } catch (Throwable th) {
                    if (th instanceof Exception) {
                        this.executionException.set((Exception) th);
                    } else {
                        this.executionException.set(ObjectHelper.wrapRuntimeCamelException(th));
                    }
                    MulticastProcessor.LOG.debug("Signaling we are done aggregating on the fly");
                    MulticastProcessor.LOG.trace("Aggregate on the fly task +++ done +++");
                    this.aggregationOnTheFlyDone.countDown();
                }
            } catch (Throwable th2) {
                MulticastProcessor.LOG.debug("Signaling we are done aggregating on the fly");
                MulticastProcessor.LOG.trace("Aggregate on the fly task +++ done +++");
                this.aggregationOnTheFlyDone.countDown();
                throw th2;
            }
        }

        private void aggregateOnTheFly() throws InterruptedException, ExecutionException {
            Future<Exchange> poll;
            boolean z = false;
            boolean z2 = false;
            StopWatch stopWatch = new StopWatch();
            int i = 0;
            while (true) {
                if (0 != 0) {
                    break;
                }
                if (!this.allTasksSubmitted.get() || i < this.total.get()) {
                    if (z) {
                        poll = this.completion.poll();
                        if (MulticastProcessor.LOG.isTraceEnabled()) {
                            MulticastProcessor.LOG.trace("Polled completion task #" + i + " after timeout to grab already completed tasks: " + poll);
                        }
                    } else if (MulticastProcessor.this.timeout > 0) {
                        long taken = MulticastProcessor.this.timeout - stopWatch.taken();
                        if (taken < 0) {
                            taken = 0;
                        }
                        if (MulticastProcessor.LOG.isTraceEnabled()) {
                            MulticastProcessor.LOG.trace("Polling completion task #" + i + " using timeout " + taken + " millis.");
                        }
                        poll = this.completion.poll(taken, TimeUnit.MILLISECONDS);
                    } else {
                        if (MulticastProcessor.LOG.isTraceEnabled()) {
                            MulticastProcessor.LOG.trace("Polling completion task #" + i);
                        }
                        poll = this.completion.poll(1L, TimeUnit.SECONDS);
                        if (poll == null) {
                            continue;
                        }
                    }
                    if (poll == null && z) {
                        break;
                    }
                    if (poll != null) {
                        Exchange exchange = poll.get();
                        boolean continueProcessing = PipelineHelper.continueProcessing(exchange, "Parallel processing failed for number " + MulticastProcessor.this.getExchangeIndex(exchange), MulticastProcessor.LOG);
                        if (MulticastProcessor.this.stopOnException && !continueProcessing) {
                            this.result.set(exchange);
                            z2 = true;
                            break;
                        } else {
                            MulticastProcessor.this.doAggregate(MulticastProcessor.this.getAggregationStrategy(exchange), this.result, exchange);
                        }
                    } else {
                        AggregationStrategy aggregationStrategy = MulticastProcessor.this.getAggregationStrategy(null);
                        if (aggregationStrategy instanceof TimeoutAwareAggregationStrategy) {
                            Exchange exchange2 = this.result.get();
                            if (exchange2 == null) {
                                exchange2 = this.original;
                            }
                            ((TimeoutAwareAggregationStrategy) aggregationStrategy).timeout(exchange2, i, this.total.intValue(), MulticastProcessor.this.timeout);
                        } else {
                            MulticastProcessor.LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", Long.valueOf(MulticastProcessor.this.timeout), Integer.valueOf(i));
                        }
                        if (MulticastProcessor.LOG.isDebugEnabled()) {
                            MulticastProcessor.LOG.debug("Timeout occurred after " + MulticastProcessor.this.timeout + " millis for number " + i + " task.");
                        }
                        z = true;
                        ExecutorServiceHelper.timeoutTask(this.completion);
                    }
                    i++;
                } else if (MulticastProcessor.LOG.isDebugEnabled()) {
                    MulticastProcessor.LOG.debug("Done aggregating " + i + " exchanges on the fly.");
                }
            }
            if (z || z2) {
                if (z && MulticastProcessor.LOG.isDebugEnabled()) {
                    MulticastProcessor.LOG.debug("Cancelling tasks due timeout after " + MulticastProcessor.this.timeout + " millis.");
                }
                if (z2 && MulticastProcessor.LOG.isDebugEnabled()) {
                    MulticastProcessor.LOG.debug("Cancelling tasks due stopOnException.");
                }
                this.running.set(false);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/camel-core-2.7.4.jar:org/apache/camel/processor/MulticastProcessor$DefaultProcessorExchangePair.class */
    public static final class DefaultProcessorExchangePair implements ProcessorExchangePair {
        private final int index;
        private final Processor processor;
        private final Processor prepared;
        private final Exchange exchange;

        private DefaultProcessorExchangePair(int i, Processor processor, Processor processor2, Exchange exchange) {
            this.index = i;
            this.processor = processor;
            this.prepared = processor2;
            this.exchange = exchange;
        }

        @Override // org.apache.camel.processor.ProcessorExchangePair
        public int getIndex() {
            return this.index;
        }

        @Override // org.apache.camel.processor.ProcessorExchangePair
        public Exchange getExchange() {
            return this.exchange;
        }

        @Override // org.apache.camel.processor.ProcessorExchangePair
        public Producer getProducer() {
            if (this.processor instanceof Producer) {
                return (Producer) this.processor;
            }
            return null;
        }

        @Override // org.apache.camel.processor.ProcessorExchangePair
        public Processor getProcessor() {
            return this.prepared;
        }

        @Override // org.apache.camel.processor.ProcessorExchangePair
        public void begin() {
        }

        @Override // org.apache.camel.processor.ProcessorExchangePair
        public void done() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/camel-core-2.7.4.jar:org/apache/camel/processor/MulticastProcessor$PreparedErrorHandler.class */
    public static final class PreparedErrorHandler extends KeyValueHolder<RouteContext, Processor> {
        public PreparedErrorHandler(RouteContext routeContext, Processor processor) {
            super(routeContext, processor);
        }
    }

    public MulticastProcessor(CamelContext camelContext, Collection<Processor> collection) {
        this(camelContext, collection, null);
    }

    public MulticastProcessor(CamelContext camelContext, Collection<Processor> collection, AggregationStrategy aggregationStrategy) {
        this(camelContext, collection, aggregationStrategy, false, null, false, false, 0L);
    }

    public MulticastProcessor(CamelContext camelContext, Collection<Processor> collection, AggregationStrategy aggregationStrategy, boolean z, ExecutorService executorService, boolean z2, boolean z3, long j) {
        this.errorHandlers = new ConcurrentHashMap();
        ObjectHelper.notNull(camelContext, "camelContext");
        this.camelContext = camelContext;
        this.processors = collection;
        this.aggregationStrategy = aggregationStrategy;
        this.executorService = executorService;
        this.streaming = z2;
        this.stopOnException = z3;
        this.parallelProcessing = z || executorService != null;
        this.timeout = j;
    }

    public String toString() {
        return "Multicast[" + getProcessors() + "]";
    }

    @Override // org.apache.camel.processor.Traceable
    public String getTraceLabel() {
        return "multicast";
    }

    public CamelContext getCamelContext() {
        return this.camelContext;
    }

    @Override // org.apache.camel.Processor
    public void process(Exchange exchange) throws Exception {
        AsyncProcessorHelper.process(this, exchange);
    }

    @Override // org.apache.camel.AsyncProcessor
    public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
        AtomicExchange atomicExchange = new AtomicExchange();
        try {
            boolean z = true;
            Iterable<ProcessorExchangePair> createProcessorExchangePairs = createProcessorExchangePairs(exchange);
            if (isParallelProcessing()) {
                ObjectHelper.notNull(this.executorService, "executorService", this);
                doProcessParallel(exchange, atomicExchange, createProcessorExchangePairs, isStreaming(), asyncCallback);
            } else {
                z = doProcessSequential(exchange, atomicExchange, createProcessorExchangePairs, asyncCallback);
            }
            if (!z) {
                return false;
            }
            doDone(exchange, atomicExchange.get() != null ? atomicExchange.get() : null, asyncCallback, true, true);
            return true;
        } catch (Throwable th) {
            exchange.setException(th);
            doDone(exchange, null, asyncCallback, true, false);
            return true;
        }
    }

    protected void doProcessParallel(Exchange exchange, AtomicExchange atomicExchange, Iterable<ProcessorExchangePair> iterable, boolean z, AsyncCallback asyncCallback) throws Exception {
        ObjectHelper.notNull(this.executorService, "ExecutorService", this);
        ObjectHelper.notNull(this.aggregateExecutorService, "AggregateExecutorService", this);
        CompletionService executorCompletionService = z ? new ExecutorCompletionService(this.executorService) : new SubmitOrderedCompletionService(this.executorService);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Iterator<ProcessorExchangePair> it = iterable.iterator();
        if (it.hasNext()) {
            final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            AtomicBoolean atomicBoolean2 = new AtomicBoolean();
            CountDownLatch countDownLatch = new CountDownLatch(1);
            AtomicException atomicException = new AtomicException();
            final AggregateOnTheFlyTask aggregateOnTheFlyTask = new AggregateOnTheFlyTask(atomicExchange, exchange, atomicInteger, executorCompletionService, atomicBoolean, countDownLatch, atomicBoolean2, atomicException);
            final AtomicBoolean atomicBoolean3 = new AtomicBoolean();
            LOG.trace("Starting to submit parallel tasks");
            while (it.hasNext()) {
                final ProcessorExchangePair next = it.next();
                final Exchange exchange2 = next.getExchange();
                updateNewExchange(exchange2, atomicInteger.intValue(), iterable, it);
                executorCompletionService.submit(new Callable<Exchange>() { // from class: org.apache.camel.processor.MulticastProcessor.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Exchange call() throws Exception {
                        if (atomicBoolean3.compareAndSet(false, true)) {
                            MulticastProcessor.this.aggregateExecutorService.submit(aggregateOnTheFlyTask);
                        }
                        if (!atomicBoolean.get()) {
                            return exchange2;
                        }
                        try {
                            MulticastProcessor.this.doProcessParallel(next);
                        } catch (Throwable th) {
                            exchange2.setException(th);
                        }
                        Integer exchangeIndex = MulticastProcessor.this.getExchangeIndex(exchange2);
                        boolean continueProcessing = PipelineHelper.continueProcessing(exchange2, "Parallel processing failed for number " + exchangeIndex, MulticastProcessor.LOG);
                        if (MulticastProcessor.this.stopOnException && !continueProcessing) {
                            atomicBoolean.set(false);
                            if (exchange2.getException() != null) {
                                throw new CamelExchangeException("Parallel processing failed for number " + exchangeIndex, exchange2, exchange2.getException());
                            }
                        }
                        if (MulticastProcessor.LOG.isTraceEnabled()) {
                            MulticastProcessor.LOG.trace("Parallel processing complete for exchange: " + exchange2);
                        }
                        return exchange2;
                    }
                });
                atomicInteger.incrementAndGet();
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace("Signaling that all " + atomicInteger.get() + " tasks has been submitted.");
            }
            atomicBoolean2.set(true);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Waiting for on-the-fly aggregation to complete aggregating " + atomicInteger.get() + " responses.");
            }
            countDownLatch.await();
            if (atomicException.get() != null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Parallel processing failed due " + atomicException.get().getMessage());
                }
                throw atomicException.get();
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Done parallel processing " + atomicInteger + " exchanges");
        }
    }

    protected boolean doProcessSequential(Exchange exchange, AtomicExchange atomicExchange, Iterable<ProcessorExchangePair> iterable, AsyncCallback asyncCallback) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        Iterator<ProcessorExchangePair> it = iterable.iterator();
        while (it.hasNext()) {
            ProcessorExchangePair next = it.next();
            Exchange exchange2 = next.getExchange();
            updateNewExchange(exchange2, atomicInteger.get(), iterable, it);
            if (!doProcessSequential(exchange, atomicExchange, iterable, it, next, asyncCallback, atomicInteger)) {
                if (!LOG.isTraceEnabled()) {
                    return false;
                }
                LOG.trace("Processing exchangeId: " + next.getExchange().getExchangeId() + " is continued being processed asynchronously");
                return false;
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace("Processing exchangeId: " + next.getExchange().getExchangeId() + " is continued being processed synchronously");
            }
            boolean continueProcessing = PipelineHelper.continueProcessing(exchange2, "Sequential processing failed for number " + atomicInteger.get(), LOG);
            if (this.stopOnException && !continueProcessing) {
                if (exchange2.getException() != null) {
                    throw new CamelExchangeException("Sequential processing failed for number " + atomicInteger.get(), exchange2, exchange2.getException());
                }
                atomicExchange.set(exchange2);
                return true;
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace("Sequential processing complete for number " + atomicInteger + " exchange: " + exchange2);
            }
            doAggregate(getAggregationStrategy(exchange2), atomicExchange, exchange2);
            atomicInteger.incrementAndGet();
        }
        if (!LOG.isDebugEnabled()) {
            return true;
        }
        LOG.debug("Done sequential processing " + atomicInteger + " exchanges");
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean doProcessSequential(final Exchange exchange, final AtomicExchange atomicExchange, final Iterable<ProcessorExchangePair> iterable, final Iterator<ProcessorExchangePair> it, final ProcessorExchangePair processorExchangePair, final AsyncCallback asyncCallback, final AtomicInteger atomicInteger) {
        final Exchange exchange2 = processorExchangePair.getExchange();
        Processor processor = processorExchangePair.getProcessor();
        Producer producer = processorExchangePair.getProducer();
        TracedRouteNodes tracedRouteNodes = exchange2.getUnitOfWork() != null ? exchange2.getUnitOfWork().getTracedRouteNodes() : null;
        StopWatch stopWatch = null;
        if (producer != null) {
            stopWatch = new StopWatch();
        }
        if (tracedRouteNodes != null) {
            try {
                tracedRouteNodes.pushBlock();
            } catch (Throwable th) {
                if (tracedRouteNodes != null) {
                    tracedRouteNodes.popBlock();
                }
                if (producer != null) {
                    EventHelper.notifyExchangeSent(exchange2.getContext(), exchange2, producer.getEndpoint(), stopWatch.stop());
                }
                throw th;
            }
        }
        AsyncProcessor convert = AsyncProcessorTypeConverter.convert(processor);
        processorExchangePair.begin();
        boolean process = AsyncProcessorHelper.process(convert, exchange2, new AsyncCallback() { // from class: org.apache.camel.processor.MulticastProcessor.2
            @Override // org.apache.camel.AsyncCallback
            public void done(boolean z) {
                processorExchangePair.done();
                if (z) {
                    return;
                }
                Exchange exchange3 = exchange2;
                boolean continueProcessing = PipelineHelper.continueProcessing(exchange3, "Sequential processing failed for number " + atomicInteger.get(), MulticastProcessor.LOG);
                if (MulticastProcessor.this.stopOnException && !continueProcessing) {
                    if (exchange3.getException() != null) {
                        exchange3.setException(new CamelExchangeException("Sequential processing failed for number " + atomicInteger, exchange3, exchange3.getException()));
                    } else {
                        atomicExchange.set(exchange3);
                    }
                    MulticastProcessor.this.doDone(exchange, exchange3, asyncCallback, false, true);
                    return;
                }
                try {
                    MulticastProcessor.this.doAggregate(MulticastProcessor.this.getAggregationStrategy(exchange3), atomicExchange, exchange3);
                    atomicInteger.incrementAndGet();
                    while (it.hasNext()) {
                        ProcessorExchangePair processorExchangePair2 = (ProcessorExchangePair) it.next();
                        Exchange exchange4 = processorExchangePair2.getExchange();
                        MulticastProcessor.this.updateNewExchange(exchange4, atomicInteger.get(), iterable, it);
                        if (!MulticastProcessor.this.doProcessSequential(exchange, atomicExchange, iterable, it, processorExchangePair2, asyncCallback, atomicInteger)) {
                            if (MulticastProcessor.LOG.isTraceEnabled()) {
                                MulticastProcessor.LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed asynchronously");
                                return;
                            }
                            return;
                        }
                        boolean continueProcessing2 = PipelineHelper.continueProcessing(exchange4, "Sequential processing failed for number " + atomicInteger.get(), MulticastProcessor.LOG);
                        if (MulticastProcessor.this.stopOnException && !continueProcessing2) {
                            if (exchange4.getException() != null) {
                                exchange4.setException(new CamelExchangeException("Sequential processing failed for number " + atomicInteger, exchange4, exchange4.getException()));
                            } else {
                                atomicExchange.set(exchange4);
                            }
                            MulticastProcessor.this.doDone(exchange, exchange4, asyncCallback, false, true);
                            return;
                        }
                        try {
                            MulticastProcessor.this.doAggregate(MulticastProcessor.this.getAggregationStrategy(exchange4), atomicExchange, exchange4);
                            atomicInteger.incrementAndGet();
                        } catch (Throwable th2) {
                            exchange4.setException(new CamelExchangeException("Sequential processing failed for number " + atomicInteger, exchange4, th2));
                            MulticastProcessor.this.doDone(exchange, exchange4, asyncCallback, false, true);
                            return;
                        }
                    }
                    MulticastProcessor.this.doDone(exchange, atomicExchange.get() != null ? atomicExchange.get() : null, asyncCallback, false, true);
                } catch (Throwable th3) {
                    exchange3.setException(new CamelExchangeException("Sequential processing failed for number " + atomicInteger, exchange3, th3));
                    MulticastProcessor.this.doDone(exchange, exchange3, asyncCallback, false, true);
                }
            }
        });
        if (tracedRouteNodes != null) {
            tracedRouteNodes.popBlock();
        }
        if (producer != null) {
            EventHelper.notifyExchangeSent(exchange2.getContext(), exchange2, producer.getEndpoint(), stopWatch.stop());
        }
        return process;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doProcessParallel(ProcessorExchangePair processorExchangePair) throws Exception {
        Exchange exchange = processorExchangePair.getExchange();
        Processor processor = processorExchangePair.getProcessor();
        Producer producer = processorExchangePair.getProducer();
        TracedRouteNodes tracedRouteNodes = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null;
        StopWatch stopWatch = null;
        if (producer != null) {
            stopWatch = new StopWatch();
        }
        if (tracedRouteNodes != null) {
            try {
                tracedRouteNodes.pushBlock();
            } catch (Throwable th) {
                processorExchangePair.done();
                if (tracedRouteNodes != null) {
                    tracedRouteNodes.popBlock();
                }
                if (producer != null) {
                    EventHelper.notifyExchangeSent(exchange.getContext(), exchange, producer.getEndpoint(), stopWatch.stop());
                }
                throw th;
            }
        }
        AsyncProcessor convert = AsyncProcessorTypeConverter.convert(processor);
        processorExchangePair.begin();
        AsyncProcessorHelper.process(convert, exchange);
        processorExchangePair.done();
        if (tracedRouteNodes != null) {
            tracedRouteNodes.popBlock();
        }
        if (producer != null) {
            EventHelper.notifyExchangeSent(exchange.getContext(), exchange, producer.getEndpoint(), stopWatch.stop());
        }
    }

    protected void doDone(Exchange exchange, Exchange exchange2, AsyncCallback asyncCallback, boolean z, boolean z2) {
        removeAggregationStrategyFromExchange(exchange);
        if (exchange.getException() != null) {
            exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.valueOf(z2));
        }
        if (exchange2 != null) {
            ExchangeHelper.copyResults(exchange, exchange2);
        }
        asyncCallback.done(z);
    }

    protected synchronized void doAggregate(AggregationStrategy aggregationStrategy, AtomicExchange atomicExchange, Exchange exchange) {
        if (aggregationStrategy != null) {
            Exchange exchange2 = atomicExchange.get();
            ExchangeHelper.prepareAggregation(exchange2, exchange);
            atomicExchange.set(aggregationStrategy.aggregate(exchange2, exchange));
        }
    }

    protected void updateNewExchange(Exchange exchange, int i, Iterable<ProcessorExchangePair> iterable, Iterator<ProcessorExchangePair> it) {
        exchange.setProperty(Exchange.MULTICAST_INDEX, Integer.valueOf(i));
        if (it.hasNext()) {
            exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.FALSE);
        } else {
            exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.TRUE);
        }
    }

    protected Integer getExchangeIndex(Exchange exchange) {
        return (Integer) exchange.getProperty(Exchange.MULTICAST_INDEX, Integer.class);
    }

    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
        ArrayList arrayList = new ArrayList(this.processors.size());
        int i = 0;
        Iterator<Processor> it = this.processors.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            arrayList.add(createProcessorExchangePair(i2, it.next(), ExchangeHelper.createCorrelatedCopy(exchange, false), exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null));
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ProcessorExchangePair createProcessorExchangePair(int i, Processor processor, Exchange exchange, RouteContext routeContext) {
        setToEndpoint(exchange, processor);
        return new DefaultProcessorExchangePair(i, processor, createErrorHandler(routeContext, processor), exchange);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Processor createErrorHandler(RouteContext routeContext, Processor processor) {
        UnitOfWorkProcessor unitOfWorkProcessor;
        if (routeContext != null) {
            PreparedErrorHandler preparedErrorHandler = new PreparedErrorHandler(routeContext, processor);
            Processor processor2 = this.errorHandlers.get(preparedErrorHandler);
            if (processor2 != null) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Using existing error handler for: " + processor);
                }
                return processor2;
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace("Creating error handler for: " + processor);
            }
            try {
                unitOfWorkProcessor = new UnitOfWorkProcessor(routeContext.getRoute().getErrorHandlerBuilder().createErrorHandler(routeContext, processor));
                this.errorHandlers.putIfAbsent(preparedErrorHandler, unitOfWorkProcessor);
            } catch (Exception e) {
                throw ObjectHelper.wrapRuntimeCamelException(e);
            }
        } else {
            unitOfWorkProcessor = new UnitOfWorkProcessor(processor);
        }
        return unitOfWorkProcessor;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.impl.ServiceSupport
    public void doStart() throws Exception {
        if (isParallelProcessing() && this.executorService == null) {
            throw new IllegalArgumentException("ParallelProcessing is enabled but ExecutorService has not been set");
        }
        if (this.timeout > 0 && !isParallelProcessing()) {
            throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled");
        }
        if (isParallelProcessing() && this.aggregateExecutorService == null) {
            this.aggregateExecutorService = createAggregateExecutorService(getClass().getSimpleName() + "-AggregateTask");
        }
        ServiceHelper.startServices(this.processors);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized ExecutorService createAggregateExecutorService(String str) {
        return this.camelContext.getExecutorServiceStrategy().newCachedThreadPool(this, str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.impl.ServiceSupport
    public void doStop() throws Exception {
        ServiceHelper.stopServices(this.processors);
        this.errorHandlers.clear();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void setToEndpoint(Exchange exchange, Processor processor) {
        if (processor instanceof Producer) {
            exchange.setProperty(Exchange.TO_ENDPOINT, ((Producer) processor).getEndpoint().getEndpointUri());
        }
    }

    protected AggregationStrategy getAggregationStrategy(Exchange exchange) {
        Map cast;
        AggregationStrategy aggregationStrategy = null;
        if (exchange != null && (cast = CastUtils.cast((Map<?, ?>) exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class))) != null) {
            aggregationStrategy = (AggregationStrategy) cast.get(this);
        }
        if (aggregationStrategy == null) {
            aggregationStrategy = getAggregationStrategy();
        }
        return aggregationStrategy;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setAggregationStrategyOnExchange(Exchange exchange, AggregationStrategy aggregationStrategy) {
        Map cast = CastUtils.cast((Map<?, ?>) exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class));
        if (cast == null) {
            cast = new HashMap();
        }
        cast.put(this, aggregationStrategy);
        exchange.setProperty(Exchange.AGGREGATION_STRATEGY, cast);
    }

    protected void removeAggregationStrategyFromExchange(Exchange exchange) {
        Map cast = CastUtils.cast((Map<?, ?>) exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class));
        if (cast == null) {
            return;
        }
        cast.remove(this);
    }

    public boolean isStreaming() {
        return this.streaming;
    }

    public boolean isStopOnException() {
        return this.stopOnException;
    }

    public Collection<Processor> getProcessors() {
        return this.processors;
    }

    public long getTimeout() {
        return this.timeout;
    }

    public AggregationStrategy getAggregationStrategy() {
        return this.aggregationStrategy;
    }

    public boolean isParallelProcessing() {
        return this.parallelProcessing;
    }

    @Override // org.apache.camel.Navigate
    public List<Processor> next() {
        if (hasNext()) {
            return new ArrayList(this.processors);
        }
        return null;
    }

    @Override // org.apache.camel.Navigate
    public boolean hasNext() {
        return (this.processors == null || this.processors.isEmpty()) ? false : true;
    }
}
