package org.apache.camel.component.disruptor;

import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicMarkableReference;
import java.util.concurrent.locks.LockSupport;
import org.apache.camel.Exchange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/disruptor/DisruptorReference.class */
public class DisruptorReference {
    private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorReference.class);
    private final DisruptorComponent component;
    private final String uri;
    private final String name;
    private final DisruptorProducerType producerType;
    private final int size;
    private final DisruptorWaitStrategy waitStrategy;
    private final Queue<Exchange> temporaryExchangeBuffer;
    private ExecutorService executor;
    private int uniqueConsumerCount;
    private final Set<DisruptorEndpoint> endpoints = Collections.newSetFromMap(new WeakHashMap(4));
    private final AtomicMarkableReference<Disruptor<ExchangeEvent>> disruptor = new AtomicMarkableReference<>(null, false);
    private final DelayedExecutor delayedExecutor = new DelayedExecutor();
    private LifecycleAwareExchangeEventHandler[] handlers = new LifecycleAwareExchangeEventHandler[0];

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/camel/component/disruptor/DisruptorReference$BlockingExchangeEventHandler.class */
    public class BlockingExchangeEventHandler extends AbstractLifecycleAwareExchangeEventHandler {
        private final CountDownLatch blockingLatch = new CountDownLatch(1);

        private BlockingExchangeEventHandler() {
        }

        @Override // org.apache.camel.component.disruptor.AbstractLifecycleAwareExchangeEventHandler
        public void onEvent(ExchangeEvent exchangeEvent, long j, boolean z) throws Exception {
            this.blockingLatch.await();
            Exchange cancelAndGetOriginalExchange = exchangeEvent.getSynchronizedExchange().cancelAndGetOriginalExchange();
            if (((Boolean) cancelAndGetOriginalExchange.getProperty(DisruptorEndpoint.DISRUPTOR_IGNORE_EXCHANGE, false, Boolean.TYPE)).booleanValue()) {
                DisruptorReference.LOGGER.trace("Ignoring exchange {}", cancelAndGetOriginalExchange);
            } else {
                DisruptorReference.this.temporaryExchangeBuffer.offer(cancelAndGetOriginalExchange);
            }
        }

        public void unblock() {
            this.blockingLatch.countDown();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/camel/component/disruptor/DisruptorReference$DelayedExecutor.class */
    public static class DelayedExecutor implements Executor {
        private final Queue<Runnable> delayedCommands = new LinkedList();

        private DelayedExecutor() {
        }

        @Override // java.util.concurrent.Executor
        public void execute(Runnable runnable) {
            this.delayedCommands.offer(runnable);
        }

        public void executeDelayedCommands(Executor executor) {
            while (true) {
                Runnable poll = this.delayedCommands.poll();
                if (poll == null) {
                    return;
                } else {
                    executor.execute(poll);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DisruptorReference(DisruptorComponent disruptorComponent, String str, String str2, int i, DisruptorProducerType disruptorProducerType, DisruptorWaitStrategy disruptorWaitStrategy) throws Exception {
        this.component = disruptorComponent;
        this.uri = str;
        this.name = str2;
        this.size = i;
        this.producerType = disruptorProducerType;
        this.waitStrategy = disruptorWaitStrategy;
        this.temporaryExchangeBuffer = new ArrayBlockingQueue(i);
        reconfigure();
    }

    public boolean hasNullReference() {
        return this.disruptor.getReference() == null;
    }

    private Disruptor<ExchangeEvent> getCurrentDisruptor() throws DisruptorNotStartedException {
        Disruptor<ExchangeEvent> reference = this.disruptor.getReference();
        if (reference == null) {
            boolean[] zArr = new boolean[1];
            while (reference == null) {
                reference = this.disruptor.get(zArr);
                if (reference == null && !zArr[0]) {
                    throw new DisruptorNotStartedException("Disruptor is not yet started or already shut down.");
                }
                if (reference == null && zArr[0]) {
                    LockSupport.parkNanos(1L);
                }
            }
        }
        return reference;
    }

    public void tryPublish(Exchange exchange) throws DisruptorNotStartedException, InsufficientCapacityException {
        tryPublishExchangeOnRingBuffer(exchange, getCurrentDisruptor().getRingBuffer());
    }

    public void publish(Exchange exchange) throws DisruptorNotStartedException {
        publishExchangeOnRingBuffer(exchange, getCurrentDisruptor().getRingBuffer());
    }

    private void publishExchangeOnRingBuffer(Exchange exchange, RingBuffer<ExchangeEvent> ringBuffer) {
        long next = ringBuffer.next();
        ((ExchangeEvent) ringBuffer.get(next)).setExchange(exchange, this.uniqueConsumerCount);
        ringBuffer.publish(next);
    }

    private void tryPublishExchangeOnRingBuffer(Exchange exchange, RingBuffer<ExchangeEvent> ringBuffer) throws InsufficientCapacityException {
        long tryNext = ringBuffer.tryNext();
        ((ExchangeEvent) ringBuffer.get(tryNext)).setExchange(exchange, this.uniqueConsumerCount);
        ringBuffer.publish(tryNext);
    }

    public synchronized void reconfigure() throws Exception {
        LOGGER.debug("Reconfiguring disruptor {}", this);
        shutdownDisruptor(true);
        start();
    }

    private void start() throws Exception {
        LOGGER.debug("Starting disruptor {}", this);
        Disruptor<ExchangeEvent> createDisruptor = createDisruptor();
        createDisruptor.start();
        if (this.executor != null) {
            this.delayedExecutor.executeDelayedCommands(this.executor);
        }
        for (LifecycleAwareExchangeEventHandler lifecycleAwareExchangeEventHandler : this.handlers) {
            boolean z = false;
            while (!z) {
                try {
                    if (!lifecycleAwareExchangeEventHandler.awaitStarted(10L, TimeUnit.SECONDS)) {
                        LOGGER.error("Disruptor/event handler failed to start properly, PLEASE REPORT");
                    }
                    z = true;
                } catch (InterruptedException e) {
                }
            }
        }
        publishBufferedExchanges(createDisruptor);
        this.disruptor.set(createDisruptor, false);
    }

    private Disruptor<ExchangeEvent> createDisruptor() throws Exception {
        Disruptor<ExchangeEvent> disruptor = new Disruptor<>(ExchangeEventFactory.INSTANCE, this.size, this.delayedExecutor, this.producerType.getProducerType(), this.waitStrategy.createWaitStrategyInstance());
        ArrayList arrayList = new ArrayList();
        this.uniqueConsumerCount = 0;
        Iterator<DisruptorEndpoint> it = this.endpoints.iterator();
        while (it.hasNext()) {
            Map<DisruptorConsumer, Collection<LifecycleAwareExchangeEventHandler>> createConsumerEventHandlers = it.next().createConsumerEventHandlers();
            if (createConsumerEventHandlers != null) {
                this.uniqueConsumerCount += createConsumerEventHandlers.keySet().size();
                Iterator<Collection<LifecycleAwareExchangeEventHandler>> it2 = createConsumerEventHandlers.values().iterator();
                while (it2.hasNext()) {
                    arrayList.addAll(it2.next());
                }
            }
        }
        LOGGER.debug("Disruptor created with {} event handlers", Integer.valueOf(arrayList.size()));
        handleEventsWith(disruptor, (LifecycleAwareExchangeEventHandler[]) arrayList.toArray(new LifecycleAwareExchangeEventHandler[arrayList.size()]));
        return disruptor;
    }

    private void handleEventsWith(Disruptor<ExchangeEvent> disruptor, LifecycleAwareExchangeEventHandler[] lifecycleAwareExchangeEventHandlerArr) {
        if (lifecycleAwareExchangeEventHandlerArr == null || lifecycleAwareExchangeEventHandlerArr.length == 0) {
            this.handlers = new LifecycleAwareExchangeEventHandler[1];
            this.handlers[0] = new BlockingExchangeEventHandler();
        } else {
            this.handlers = lifecycleAwareExchangeEventHandlerArr;
        }
        resizeThreadPoolExecutor(this.handlers.length);
        disruptor.handleEventsWith(this.handlers);
    }

    private void publishBufferedExchanges(Disruptor<ExchangeEvent> disruptor) {
        ArrayList arrayList = new ArrayList(this.temporaryExchangeBuffer.size());
        while (!this.temporaryExchangeBuffer.isEmpty()) {
            arrayList.add(this.temporaryExchangeBuffer.remove());
        }
        RingBuffer<ExchangeEvent> ringBuffer = disruptor.getRingBuffer();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            publishExchangeOnRingBuffer((Exchange) it.next(), ringBuffer);
        }
    }

    private void resizeThreadPoolExecutor(int i) {
        if (this.executor == null && i > 0) {
            LOGGER.debug("Creating new executor with {} threads", Integer.valueOf(i));
            this.executor = this.component.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, this.uri, i);
            return;
        }
        if (this.executor != null && i <= 0) {
            LOGGER.debug("Shutting down executor");
            this.component.getCamelContext().getExecutorServiceManager().shutdown(this.executor);
            this.executor = null;
            return;
        }
        if (!(this.executor instanceof ThreadPoolExecutor)) {
            if (i > 0) {
                LOGGER.debug("Shutting down old and creating new executor with {} threads", Integer.valueOf(i));
                this.component.getCamelContext().getExecutorServiceManager().shutdown(this.executor);
                this.executor = this.component.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, this.uri, i);
                return;
            }
            return;
        }
        LOGGER.debug("Resizing existing executor to {} threads", Integer.valueOf(i));
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) this.executor;
        if (i <= threadPoolExecutor.getCorePoolSize()) {
            threadPoolExecutor.setCorePoolSize(i);
            threadPoolExecutor.setMaximumPoolSize(i);
        } else {
            threadPoolExecutor.setMaximumPoolSize(i);
            threadPoolExecutor.setCorePoolSize(i);
        }
    }

    private synchronized void shutdownDisruptor(boolean z) {
        LOGGER.debug("Shutting down disruptor {}, reconfiguring: {}", this, Boolean.valueOf(z));
        Disruptor<ExchangeEvent> reference = this.disruptor.getReference();
        this.disruptor.set(null, z);
        if (reference != null) {
            if (this.handlers != null && this.handlers.length == 1 && (this.handlers[0] instanceof BlockingExchangeEventHandler)) {
                ((BlockingExchangeEventHandler) this.handlers[0]).unblock();
            }
            reference.shutdown();
            for (LifecycleAwareExchangeEventHandler lifecycleAwareExchangeEventHandler : this.handlers) {
                boolean z2 = false;
                while (!z2) {
                    try {
                        if (!lifecycleAwareExchangeEventHandler.awaitStopped(10L, TimeUnit.SECONDS)) {
                            LOGGER.error("Disruptor/event handler failed to shut down properly, PLEASE REPORT");
                        }
                        z2 = true;
                    } catch (InterruptedException e) {
                    }
                }
            }
            this.handlers = new LifecycleAwareExchangeEventHandler[0];
        }
    }

    private synchronized void shutdownExecutor() {
        resizeThreadPoolExecutor(0);
    }

    public String getName() {
        return this.name;
    }

    public long getRemainingCapacity() throws DisruptorNotStartedException {
        return getCurrentDisruptor().getRingBuffer().remainingCapacity();
    }

    public DisruptorWaitStrategy getWaitStrategy() {
        return this.waitStrategy;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DisruptorProducerType getProducerType() {
        return this.producerType;
    }

    public int getBufferSize() {
        return this.size;
    }

    public int getPendingExchangeCount() {
        try {
            if (!hasNullReference()) {
                return (int) ((getBufferSize() - getRemainingCapacity()) + this.temporaryExchangeBuffer.size());
            }
        } catch (DisruptorNotStartedException e) {
        }
        return this.temporaryExchangeBuffer.size();
    }

    public synchronized void addEndpoint(DisruptorEndpoint disruptorEndpoint) {
        LOGGER.debug("Adding Endpoint: {}", disruptorEndpoint);
        this.endpoints.add(disruptorEndpoint);
        LOGGER.debug("Endpoint added: {}, new total endpoints {}", disruptorEndpoint, Integer.valueOf(this.endpoints.size()));
    }

    public synchronized void removeEndpoint(DisruptorEndpoint disruptorEndpoint) {
        LOGGER.debug("Removing Endpoint: {}", disruptorEndpoint);
        if (getEndpointCount() == 1) {
            LOGGER.debug("Last Endpoint removed, shutdown disruptor");
            shutdownDisruptor(false);
            shutdownExecutor();
        }
        this.endpoints.remove(disruptorEndpoint);
        LOGGER.debug("Endpoint removed: {}, new total endpoints {}", disruptorEndpoint, Integer.valueOf(getEndpointCount()));
    }

    public synchronized int getEndpointCount() {
        return this.endpoints.size();
    }

    public String toString() {
        return "DisruptorReference{uri='" + this.uri + "', endpoint count=" + this.endpoints.size() + ", handler count=" + this.handlers.length + "}";
    }
}
