package org.apache.geode.internal.cache.wan.serial;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackArgument;
import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderStats;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadGroup;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.pdx.internal.PeerTypeRegistration;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.class */
public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEventProcessor {
    private static final Logger logger = LogService.getLogger();
    private final Object unprocessedEventsLock;
    protected static final int RANDOM_SLEEP_TIME = 1000;
    private Map<EventID, AbstractGatewaySender.EventWrapper> unprocessedEvents;
    private Map<EventID, Long> unprocessedTokens;
    private ExecutorService executor;
    private Object listenerObjectLock;
    private boolean failoverCompleted;
    private final Object failoverCompletedLock;
    private static final int REAP_THRESHOLD = 1000;
    private int uncheckedCount;

    public SerialGatewaySenderEventProcessor(AbstractGatewaySender abstractGatewaySender, String str) {
        super(LoggingThreadGroup.createThreadGroup("Event Processor for GatewaySender_" + str, logger), "Event Processor for GatewaySender_" + str, abstractGatewaySender);
        this.unprocessedEventsLock = new Object();
        this.listenerObjectLock = new Object();
        this.failoverCompleted = false;
        this.failoverCompletedLock = new Object();
        this.uncheckedCount = 0;
        this.unprocessedEvents = new LinkedHashMap();
        this.unprocessedTokens = new LinkedHashMap();
        initializeMessageQueue(str);
        setDaemon(true);
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    protected void initializeMessageQueue(String str) {
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append(str).append("_SERIAL_GATEWAY_SENDER_QUEUE");
        String stringBuffer2 = stringBuffer.toString();
        SerialSecondaryGatewayListener serialSecondaryGatewayListener = null;
        if (!this.sender.isPrimary()) {
            serialSecondaryGatewayListener = new SerialSecondaryGatewayListener(this);
            initializeListenerExecutor();
        }
        this.queue = new SerialGatewaySenderQueue(this.sender, stringBuffer2, serialSecondaryGatewayListener);
        if (logger.isDebugEnabled()) {
            logger.debug("Created queue: {}", this.queue);
        }
    }

    protected boolean waitForPrimary() {
        try {
            try {
                this.sender.getSenderAdvisor().waitToBecomePrimary();
                shutdownListenerExecutor();
                DistributedSystem.setThreadsSocketPolicy(true);
                if (stopped()) {
                    return false;
                }
                handleFailover();
                return true;
            } catch (InterruptedException e) {
                if (!stopped()) {
                    logger.fatal(LocalizedMessage.create(LocalizedStrings.GatewayImpl_AN_INTERRUPTEDEXCEPTION_OCCURRED_THE_THREAD_WILL_EXIT), e);
                }
                shutdownListenerExecutor();
                return false;
            }
        } catch (CancelException e2) {
            if (!stopped()) {
                logger.debug("Terminating due to {}", e2.getMessage(), e2);
            }
            return false;
        } catch (RegionDestroyedException e3) {
            if (!stopped()) {
                logger.debug("Terminating due to {}", e3.getMessage(), e3);
            }
            return false;
        } finally {
            completeFailover();
        }
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor, java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            setRunningStatus();
            if (this.sender.isPrimary()) {
                completeFailover();
            } else if (!waitForPrimary()) {
                return;
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Beginning to process the message queue");
            }
            if (!this.sender.isPrimary()) {
                logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayImpl_ABOUT_TO_PROCESS_THE_MESSAGE_QUEUE_BUT_NOT_THE_PRIMARY));
            }
            try {
                Thread.sleep(new Random().nextInt(1000));
                processQueue();
            } catch (InterruptedException e) {
            }
        } catch (VirtualMachineError e2) {
            SystemFailure.initiateFailure(e2);
            throw e2;
        } catch (CancelException e3) {
            if (isStopped()) {
                return;
            }
            logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_A_CANCELLATION_OCCURRED_STOPPING_THE_DISPATCHER));
            setIsStopped(true);
        } catch (Throwable th) {
            SystemFailure.checkFailure();
            logger.fatal(LocalizedMessage.create(LocalizedStrings.GatewayImpl_MESSAGE_DISPATCH_FAILED_DUE_TO_UNEXPECTED_EXCEPTION), th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void rebalance() {
        throw new UnsupportedOperationException();
    }

    /* JADX WARN: Finally extract failed */
    protected void handleFailover() {
        AbstractGatewaySender.EventWrapper remove;
        synchronized (this.unprocessedEventsLock) {
            this.queue.removeCacheListener();
            this.unprocessedTokens = null;
            logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_GATEWAY_FAILOVER_INITIATED_PROCESSING_0_UNPROCESSED_EVENTS, Integer.valueOf(this.unprocessedEvents.size())));
            GatewaySenderStats statistics = this.sender.getStatistics();
            if (!this.unprocessedEvents.isEmpty()) {
                reapOld(statistics, true);
                Iterator it = this.queue.getRegion().values().iterator();
                while (it.hasNext() && !stopped()) {
                    Object next = it.next();
                    if (next != null && (next instanceof GatewaySenderEventImpl) && (remove = this.unprocessedEvents.remove(((GatewaySenderEventImpl) next).getEventId())) != null) {
                        remove.event.release();
                        if (this.unprocessedEvents.isEmpty()) {
                            break;
                        }
                    }
                }
                Iterator<Map.Entry<EventID, AbstractGatewaySender.EventWrapper>> it2 = this.unprocessedEvents.entrySet().iterator();
                while (it2.hasNext() && !stopped()) {
                    GatewaySenderEventImpl gatewaySenderEventImpl = it2.next().getValue().event;
                    gatewaySenderEventImpl.initialize();
                    GatewaySenderEventCallbackArgument senderCallbackArgument = gatewaySenderEventImpl.getSenderCallbackArgument();
                    if (senderCallbackArgument.getOriginatingDSId() == -1) {
                        senderCallbackArgument.setOriginatingDSId(this.sender.getMyDSId());
                        senderCallbackArgument.initializeReceipientDSIds(Collections.singletonList(Integer.valueOf(this.sender.getRemoteDSId())));
                    }
                    it2.remove();
                    boolean z = false;
                    try {
                        try {
                            try {
                                z = queuePrimaryEvent(gatewaySenderEventImpl);
                                if (!z) {
                                    gatewaySenderEventImpl.release();
                                }
                            } catch (Throwable th) {
                                if (!z) {
                                    gatewaySenderEventImpl.release();
                                }
                                throw th;
                            }
                        } catch (CacheException e) {
                            if (!stopped()) {
                                logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayImpl_EVENT_DROPPED_DURING_FAILOVER_0, gatewaySenderEventImpl), e);
                            }
                            if (!z) {
                                gatewaySenderEventImpl.release();
                            }
                        }
                    } catch (IOException e2) {
                        if (!stopped()) {
                            logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayImpl_EVENT_DROPPED_DURING_FAILOVER_0, gatewaySenderEventImpl), e2);
                        }
                        if (!z) {
                            gatewaySenderEventImpl.release();
                        }
                    }
                }
                statistics.clearUnprocessedMaps();
            }
            logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_0__MARKING__1__EVENTS_AS_POSSIBLE_DUPLICATES, new Object[]{getSender(), Integer.valueOf(this.queue.size())}));
            Iterator it3 = this.queue.getRegion().values().iterator();
            while (it3.hasNext() && !stopped()) {
                Object next2 = it3.next();
                if (next2 != null && (next2 instanceof GatewaySenderEventImpl)) {
                    ((GatewaySenderEventImpl) next2).setPossibleDuplicate(true);
                }
            }
            releaseUnprocessedEvents();
        }
    }

    private void releaseUnprocessedEvents() {
        synchronized (this.unprocessedEventsLock) {
            Map<EventID, AbstractGatewaySender.EventWrapper> map = this.unprocessedEvents;
            if (map != null) {
                Iterator<AbstractGatewaySender.EventWrapper> it = map.values().iterator();
                while (it.hasNext()) {
                    it.next().event.release();
                }
                this.unprocessedEvents = null;
            }
        }
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void closeProcessor() {
        try {
            super.closeProcessor();
        } finally {
            releaseUnprocessedEvents();
        }
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void enqueueEvent(EnumListenerEvent enumListenerEvent, EntryEvent entryEvent, Object obj) throws IOException, CacheException {
        boolean isPrimary = this.sender.isPrimary();
        if (!isPrimary) {
            synchronized (this.unprocessedEventsLock) {
                if (this.sender.isPrimary()) {
                    isPrimary = true;
                } else {
                    handleSecondaryEvent(new GatewaySenderEventImpl(enumListenerEvent, entryEvent, obj, false));
                }
            }
        }
        if (isPrimary) {
            Region<K, V> region = entryEvent.getRegion();
            if (!((region instanceof DistributedRegion) && region.getName().equals(PeerTypeRegistration.REGION_NAME))) {
                waitForFailoverCompletion();
            }
            GatewaySenderEventImpl gatewaySenderEventImpl = new GatewaySenderEventImpl(enumListenerEvent, entryEvent, obj);
            boolean z = false;
            try {
                z = queuePrimaryEvent(gatewaySenderEventImpl);
                if (z) {
                    return;
                }
                GatewaySenderEventImpl.release(gatewaySenderEventImpl);
            } catch (Throwable th) {
                if (!z) {
                    GatewaySenderEventImpl.release(gatewaySenderEventImpl);
                }
                throw th;
            }
        }
    }

    private boolean queuePrimaryEvent(GatewaySenderEventImpl gatewaySenderEventImpl) throws IOException, CacheException {
        GatewaySenderStats statistics = this.sender.getStatistics();
        if (logger.isDebugEnabled()) {
            logger.debug("{}: Queueing event ({}): {}", this.sender.getId(), Integer.valueOf(statistics.getEventsQueued() + 1), gatewaySenderEventImpl);
        }
        if (!this.sender.beforeEnqueue(gatewaySenderEventImpl)) {
            if (logger.isDebugEnabled()) {
                logger.debug("Event {} is not added to queue.", gatewaySenderEventImpl);
            }
            statistics.incEventsFiltered();
            return false;
        }
        long startTime = statistics.startTime();
        boolean z = false;
        try {
            z = this.queue.put(gatewaySenderEventImpl);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            getSender().getCancelCriterion().checkCancelInProgress(e);
        }
        statistics.endPut(startTime);
        if (logger.isDebugEnabled()) {
            logger.debug("{}: Queued event ({}): {}", this.sender.getId(), Integer.valueOf(statistics.getEventsQueued()), gatewaySenderEventImpl);
        }
        int eventQueueSize = eventQueueSize();
        statistics.incQueueSize(1);
        if (!this.eventQueueSizeWarning && eventQueueSize >= AbstractGatewaySender.QUEUE_SIZE_THRESHOLD) {
            logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayImpl_0_THE_EVENT_QUEUE_SIZE_HAS_REACHED_THE_THRESHOLD_1, new Object[]{this.sender.getId(), Integer.valueOf(AbstractGatewaySender.QUEUE_SIZE_THRESHOLD)}));
            this.eventQueueSizeWarning = true;
        }
        return z;
    }

    protected void waitForFailoverCompletion() {
        synchronized (this.failoverCompletedLock) {
            if (this.failoverCompleted) {
                return;
            }
            logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_0__WAITING_FOR_FAILOVER_COMPLETION, this));
            while (!this.failoverCompleted) {
                try {
                    this.failoverCompletedLock.wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    this.sender.getCache().getCancelCriterion().checkCancelInProgress(e);
                    logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_0_DID_NOT_WAIT_FOR_FAILOVER_COMPLETION_DUE_TO_INTERRUPTION, this));
                }
            }
        }
    }

    protected void completeFailover() {
        synchronized (this.failoverCompletedLock) {
            this.failoverCompleted = true;
            this.failoverCompletedLock.notifyAll();
        }
    }

    protected void handleSecondaryEvent(GatewaySenderEventImpl gatewaySenderEventImpl) {
        basicHandleSecondaryEvent(gatewaySenderEventImpl);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handlePrimaryEvent(final GatewaySenderEventImpl gatewaySenderEventImpl) {
        ExecutorService executorService = this.executor;
        synchronized (this.listenerObjectLock) {
            if (executorService == null) {
                return;
            }
            try {
                executorService.execute(new Runnable() { // from class: org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderEventProcessor.1
                    @Override // java.lang.Runnable
                    public void run() {
                        SerialGatewaySenderEventProcessor.this.basicHandlePrimaryEvent(gatewaySenderEventImpl);
                    }
                });
            } catch (RejectedExecutionException e) {
                throw e;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handlePrimaryDestroy(final GatewaySenderEventImpl gatewaySenderEventImpl) {
        ExecutorService executorService = this.executor;
        synchronized (this.listenerObjectLock) {
            if (executorService == null) {
                return;
            }
            try {
                executorService.execute(new Runnable() { // from class: org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderEventProcessor.2
                    @Override // java.lang.Runnable
                    public void run() {
                        SerialGatewaySenderEventProcessor.this.basicHandlePrimaryDestroy(gatewaySenderEventImpl);
                    }
                });
            } catch (RejectedExecutionException e) {
                throw e;
            }
        }
    }

    protected void basicHandlePrimaryDestroy(GatewaySenderEventImpl gatewaySenderEventImpl) {
        if (this.sender.isPrimary()) {
            return;
        }
        GatewaySenderStats statistics = this.sender.getStatistics();
        synchronized (this.unprocessedEventsLock) {
            if (this.unprocessedEvents == null) {
                return;
            }
            AbstractGatewaySender.EventWrapper remove = this.unprocessedEvents.remove(gatewaySenderEventImpl.getEventId());
            if (remove != null) {
                remove.event.release();
                statistics.incUnprocessedEventsRemovedByPrimary();
            }
        }
    }

    protected void basicHandlePrimaryEvent(GatewaySenderEventImpl gatewaySenderEventImpl) {
        if (this.sender.isPrimary()) {
            return;
        }
        GatewaySenderStats statistics = this.sender.getStatistics();
        synchronized (this.unprocessedEventsLock) {
            if (this.unprocessedEvents == null) {
                return;
            }
            AbstractGatewaySender.EventWrapper remove = this.unprocessedEvents.remove(gatewaySenderEventImpl.getEventId());
            if (remove == null) {
                if (logger.isTraceEnabled()) {
                    logger.trace("{}: fromPrimary event {} : {}->{} added to unprocessed token map", this.sender.getId(), gatewaySenderEventImpl.getEventId(), gatewaySenderEventImpl.getKey(), gatewaySenderEventImpl.getValueAsString(true));
                }
                if (this.unprocessedTokens.put(gatewaySenderEventImpl.getEventId(), Long.valueOf(System.currentTimeMillis() + AbstractGatewaySender.TOKEN_TIMEOUT)) == null) {
                    statistics.incUnprocessedTokensAddedByPrimary();
                }
            } else {
                if (logger.isTraceEnabled()) {
                    logger.trace("{}: Primary create/update event {}:{}->{} remove from unprocessed events map", this.sender.getId(), gatewaySenderEventImpl.getEventId(), gatewaySenderEventImpl.getKey(), gatewaySenderEventImpl.getValueAsString(true));
                }
                remove.event.release();
                statistics.incUnprocessedEventsRemovedByPrimary();
            }
            reapOld(statistics, false);
        }
    }

    private void basicHandleSecondaryEvent(GatewaySenderEventImpl gatewaySenderEventImpl) {
        boolean z = true;
        try {
            GatewaySenderStats statistics = this.sender.getStatistics();
            if (!getSender().getGatewayEventFilters().isEmpty()) {
                try {
                    gatewaySenderEventImpl.initialize();
                } catch (Exception e) {
                    logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayImpl_EVENT_FAILED_TO_BE_INITIALIZED_0, gatewaySenderEventImpl), e);
                }
                if (!this.sender.beforeEnqueue(gatewaySenderEventImpl)) {
                    statistics.incEventsFiltered();
                    if (1 != 0) {
                        gatewaySenderEventImpl.release();
                        return;
                    }
                    return;
                }
            }
            Assert.assertHoldsLock(this.unprocessedEventsLock, true);
            Assert.assertTrue(this.unprocessedEvents != null);
            Long remove = this.unprocessedTokens.remove(gatewaySenderEventImpl.getEventId());
            if (remove == null) {
                if (logger.isTraceEnabled()) {
                    logger.trace("{}: fromSecondary event {}:{}->{} added from unprocessed events map", this.sender.getId(), gatewaySenderEventImpl.getEventId(), gatewaySenderEventImpl.getKey(), gatewaySenderEventImpl.getValueAsString(true));
                }
                AbstractGatewaySender.EventWrapper put = this.unprocessedEvents.put(gatewaySenderEventImpl.getEventId(), new AbstractGatewaySender.EventWrapper(gatewaySenderEventImpl));
                if (put == null) {
                    z = false;
                    statistics.incUnprocessedEventsAddedBySecondary();
                } else {
                    this.unprocessedEvents.put(gatewaySenderEventImpl.getEventId(), put);
                    logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayImpl_0_THE_UNPROCESSED_EVENTS_MAP_ALREADY_CONTAINED_AN_EVENT_FROM_THE_HUB_1_SO_IGNORING_NEW_EVENT_2, new Object[]{this.sender.getId(), remove, gatewaySenderEventImpl}));
                }
            } else {
                if (logger.isTraceEnabled()) {
                    logger.trace("{}: Secondary created event {}:{}->{} removed from unprocessed events map", this.sender.getId(), gatewaySenderEventImpl.getEventId(), gatewaySenderEventImpl.getKey(), gatewaySenderEventImpl.getValueAsString(true));
                }
                statistics.incUnprocessedTokensRemovedBySecondary();
            }
            reapOld(statistics, false);
            if (z) {
                gatewaySenderEventImpl.release();
            }
        } catch (Throwable th) {
            if (1 != 0) {
                gatewaySenderEventImpl.release();
            }
            throw th;
        }
    }

    private void reapOld(GatewaySenderStats gatewaySenderStats, boolean z) {
        synchronized (this.unprocessedEventsLock) {
            if (this.uncheckedCount > 1000) {
                this.uncheckedCount = 0;
                long currentTimeMillis = System.currentTimeMillis();
                if (!z && this.unprocessedTokens.size() > 1000) {
                    Iterator<Map.Entry<EventID, Long>> it = this.unprocessedTokens.entrySet().iterator();
                    int i = 0;
                    while (it.hasNext() && it.next().getValue().longValue() <= currentTimeMillis) {
                        it.remove();
                        i++;
                    }
                    if (i > 0) {
                    }
                }
                if (z || this.unprocessedEvents.size() > 1000) {
                    Iterator<Map.Entry<EventID, AbstractGatewaySender.EventWrapper>> it2 = this.unprocessedEvents.entrySet().iterator();
                    int i2 = 0;
                    while (it2.hasNext()) {
                        AbstractGatewaySender.EventWrapper value = it2.next().getValue();
                        if (value.timeout > currentTimeMillis) {
                            break;
                        }
                        it2.remove();
                        value.event.release();
                        i2++;
                    }
                    if (i2 > 0) {
                    }
                }
            } else {
                this.uncheckedCount++;
            }
        }
    }

    @Override // java.lang.Thread
    public String toString() {
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append("GatewayEventProcessor[").append("gatewaySenderId=").append(this.sender.getId()).append(";remoteDSId=").append(getSender().getRemoteDSId()).append(";batchSize=").append(getSender().getBatchSize());
        stringBuffer.append("]");
        return stringBuffer.toString();
    }

    private void initializeListenerExecutor() {
        final LoggingThreadGroup createThreadGroup = LoggingThreadGroup.createThreadGroup("Gateway Listener Group", logger);
        ThreadFactory threadFactory = new ThreadFactory() { // from class: org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderEventProcessor.3
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(createThreadGroup, runnable, "Queued Gateway Listener Thread");
                thread.setDaemon(true);
                return thread;
            }
        };
        this.executor = new ThreadPoolExecutor(1, 1, 120L, TimeUnit.SECONDS, new LinkedBlockingQueue(), threadFactory);
    }

    private void shutdownListenerExecutor() {
        synchronized (this.listenerObjectLock) {
            if (this.executor != null) {
                this.executor.shutdown();
                this.executor = null;
            }
        }
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void removeCacheListener() {
        this.queue.removeCacheListener();
    }

    @Override // org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void initializeEventDispatcher() {
        if (logger.isDebugEnabled()) {
            logger.debug(" Creating the GatewayEventCallbackDispatcher");
        }
        this.dispatcher = new GatewaySenderEventCallbackDispatcher(this);
    }
}
