package org.apache.geode.internal.cache.wan;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.apache.geode.CancelException;
import org.apache.geode.GemFireIOException;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.ServerOperationException;
import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.SenderProxy;
import org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.UpdateAttributesProcessor;
import org.apache.geode.internal.cache.tier.sockets.MessageTooLargeException;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.pdx.PdxRegistryMismatchException;
import org.apache.geode.security.GemFireSecurityException;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.class */
public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDispatcher {
    protected final AbstractGatewaySenderEventProcessor processor;
    private volatile Connection connection;
    private final AbstractGatewaySender sender;
    private AckReaderThread ackReaderThread;
    private static final int RETRY_WAIT_TIME = 100;
    private static final Logger logger = LogService.getLogger();
    public static volatile Consumer<Boolean> messageProcessingAttempted = bool -> {
    };
    private final Set<String> notFoundRegions = new HashSet();
    private final Object notFoundRegionsSync = new Object();
    private ReentrantReadWriteLock connectionLifeCycleLock = new ReentrantReadWriteLock();
    private int failedConnectCount = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher$AckReaderThread.class */
    public class AckReaderThread extends Thread {
        private Object runningStateLock;
        private volatile boolean shutdown;
        private final InternalCache cache;
        private volatile boolean ackReaderThreadRunning;

        public AckReaderThread(GatewaySenderEventRemoteDispatcher gatewaySenderEventRemoteDispatcher, GatewaySender gatewaySender, AbstractGatewaySenderEventProcessor abstractGatewaySenderEventProcessor) {
            this(gatewaySender, abstractGatewaySenderEventProcessor.getName());
        }

        boolean isShutdown() {
            return this.shutdown;
        }

        public AckReaderThread(GatewaySender gatewaySender, String str) {
            super("AckReaderThread for : " + str);
            this.runningStateLock = new Object();
            this.shutdown = false;
            this.ackReaderThreadRunning = false;
            setDaemon(true);
            this.cache = ((AbstractGatewaySender) gatewaySender).getCache();
        }

        public void waitForRunningAckReaderThreadRunningState() {
            synchronized (this.runningStateLock) {
                while (!this.ackReaderThreadRunning) {
                    try {
                        this.runningStateLock.wait();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }

        private boolean checkCancelled() {
            return this.shutdown || this.cache.getCancelCriterion().isCancelInProgress();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            if (GatewaySenderEventRemoteDispatcher.logger.isDebugEnabled()) {
                GatewaySenderEventRemoteDispatcher.logger.debug("AckReaderThread started.. ");
            }
            synchronized (this.runningStateLock) {
                this.ackReaderThreadRunning = true;
                this.runningStateLock.notifyAll();
            }
            while (!checkCancelled()) {
                try {
                    try {
                        GatewayAck readAcknowledgement = GatewaySenderEventRemoteDispatcher.this.readAcknowledgement();
                        if (readAcknowledgement != null) {
                            boolean z = readAcknowledgement.getBatchException() != null;
                            int batchId = readAcknowledgement.getBatchId();
                            readAcknowledgement.getNumEvents();
                            if (z) {
                                GatewaySenderEventRemoteDispatcher.logger.warn("Gateway Sender {} : Received ack for batch id {} with one or more exceptions", GatewaySenderEventRemoteDispatcher.this.processor.getSender(), Integer.valueOf(readAcknowledgement.getBatchId()));
                                GatewaySenderEventRemoteDispatcher.this.sender.getStatistics().incBatchesRedistributed();
                                if (GatewaySenderEventRemoteDispatcher.this.sender.isRemoveFromQueueOnException()) {
                                    logBatchExceptions(readAcknowledgement.getBatchException());
                                    GatewaySenderEventRemoteDispatcher.this.processor.handleSuccessBatchAck(batchId);
                                } else {
                                    logBatchExceptions(readAcknowledgement.getBatchException());
                                    GatewaySenderEventRemoteDispatcher.this.processor.handleSuccessBatchAck(batchId);
                                }
                            } else {
                                if (GatewaySenderEventRemoteDispatcher.logger.isDebugEnabled()) {
                                    GatewaySenderEventRemoteDispatcher.logger.debug("Gateway Sender {} : Received ack for batch id {} of {} events", GatewaySenderEventRemoteDispatcher.this.processor.getSender(), Integer.valueOf(readAcknowledgement.getBatchId()), Integer.valueOf(readAcknowledgement.getNumEvents()));
                                }
                                GatewaySenderEventRemoteDispatcher.this.processor.handleSuccessBatchAck(batchId);
                            }
                        } else {
                            if (GatewaySenderEventRemoteDispatcher.logger.isDebugEnabled()) {
                                GatewaySenderEventRemoteDispatcher.logger.debug("{}: Received null ack from remote site.", GatewaySenderEventRemoteDispatcher.this.processor.getSender());
                            }
                            GatewaySenderEventRemoteDispatcher.this.processor.handleException();
                            try {
                                Thread.sleep(GatewaySender.CONNECTION_RETRY_INTERVAL);
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                            }
                        }
                    } catch (Exception e2) {
                        if (!checkCancelled()) {
                            GatewaySenderEventRemoteDispatcher.logger.fatal("Stopping the processor because the following exception occurred while processing a batch:", e2);
                        }
                        GatewaySenderEventRemoteDispatcher.this.sender.getLifeCycleLock().writeLock().lock();
                        try {
                            GatewaySenderEventRemoteDispatcher.this.processor.stopProcessing();
                            GatewaySenderEventRemoteDispatcher.this.sender.clearTempEventsAfterSenderStopped();
                            GatewaySenderEventRemoteDispatcher.this.sender.getLifeCycleLock().writeLock().unlock();
                            if (GatewaySenderEventRemoteDispatcher.logger.isDebugEnabled()) {
                                GatewaySenderEventRemoteDispatcher.logger.debug("AckReaderThread exiting. ");
                            }
                            this.ackReaderThreadRunning = false;
                            return;
                        } catch (Throwable th) {
                            GatewaySenderEventRemoteDispatcher.this.sender.getLifeCycleLock().writeLock().unlock();
                            throw th;
                        }
                    }
                } catch (Throwable th2) {
                    if (GatewaySenderEventRemoteDispatcher.logger.isDebugEnabled()) {
                        GatewaySenderEventRemoteDispatcher.logger.debug("AckReaderThread exiting. ");
                    }
                    this.ackReaderThreadRunning = false;
                    throw th2;
                }
            }
            if (GatewaySenderEventRemoteDispatcher.logger.isDebugEnabled()) {
                GatewaySenderEventRemoteDispatcher.logger.debug("AckReaderThread exiting. ");
            }
            this.ackReaderThreadRunning = false;
        }

        protected void logBatchExceptions(BatchException70 batchException70) {
            try {
                for (BatchException70 batchException702 : batchException70.getExceptions()) {
                    boolean z = true;
                    if (batchException702.getCause() instanceof RegionDestroyedException) {
                        RegionDestroyedException cause = batchException702.getCause();
                        synchronized (GatewaySenderEventRemoteDispatcher.this.notFoundRegionsSync) {
                            if (GatewaySenderEventRemoteDispatcher.this.notFoundRegions.contains(cause.getRegionFullPath())) {
                                z = false;
                            } else {
                                GatewaySenderEventRemoteDispatcher.this.notFoundRegions.add(cause.getRegionFullPath());
                            }
                        }
                    } else if ((batchException702.getCause() instanceof IllegalStateException) && batchException702.getCause().getMessage().contains("Unknown pdx type")) {
                        List list = (List) GatewaySenderEventRemoteDispatcher.this.processor.getBatchIdToPDXEventsMap().get(Integer.valueOf(batchException702.getBatchId()));
                        if (1 != 0) {
                            GatewaySenderEventRemoteDispatcher.logger.warn(String.format("A BatchException occurred processing PDX events. Index of array of Exception : %s", Integer.valueOf(batchException702.getIndex())), batchException702);
                        }
                        if (list != null) {
                            Iterator it = list.iterator();
                            while (it.hasNext()) {
                                ((GatewaySenderEventImpl) it.next()).isAcked = false;
                            }
                            GatewaySenderEventImpl gatewaySenderEventImpl = (GatewaySenderEventImpl) list.get(batchException702.getIndex());
                            if (1 != 0) {
                                GatewaySenderEventRemoteDispatcher.logger.warn("The event being processed when the BatchException occurred was:  {}", gatewaySenderEventImpl);
                            }
                        }
                    }
                    if (z) {
                        GatewaySenderEventRemoteDispatcher.logger.warn(String.format("A BatchException occurred processing events. Index of Array of Exception : %s", Integer.valueOf(batchException702.getIndex())), batchException702);
                    }
                    List[] listArr = (List[]) GatewaySenderEventRemoteDispatcher.this.processor.getBatchIdToEventsMap().get(Integer.valueOf(batchException702.getBatchId()));
                    if (listArr != null) {
                        GatewaySenderEventImpl gatewaySenderEventImpl2 = (GatewaySenderEventImpl) listArr[1].get(batchException702.getIndex());
                        if (z) {
                            GatewaySenderEventRemoteDispatcher.logger.warn("The event being processed when the BatchException occurred was:  {}", gatewaySenderEventImpl2);
                        }
                    }
                }
            } catch (Exception e) {
                GatewaySenderEventRemoteDispatcher.logger.warn("An unexpected exception occurred processing a BatchException. The thread will continue.", e);
            }
        }

        boolean isRunning() {
            return this.ackReaderThreadRunning;
        }

        public void shutdown() {
            Connection connection = GatewaySenderEventRemoteDispatcher.this.connection;
            if (connection != null) {
                shutDownAckReaderConnection(connection);
                if (!connection.isDestroyed()) {
                    connection.destroy();
                    GatewaySenderEventRemoteDispatcher.this.sender.getProxy().returnConnection(connection);
                }
            }
            this.shutdown = true;
            boolean interrupted = Thread.interrupted();
            try {
                join(15000L);
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
            } catch (InterruptedException e) {
                if (1 != 0) {
                    Thread.currentThread().interrupt();
                }
            } catch (Throwable th) {
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
                throw th;
            }
            if (isAlive()) {
                GatewaySenderEventRemoteDispatcher.logger.warn("AckReaderThread ignored cancellation");
            }
        }

        protected void shutDownAckReaderConnection(Connection connection) {
            if (connection != null) {
                try {
                    if (connection.getInputStream() != null) {
                        connection.getInputStream().close();
                    }
                } catch (ConnectionDestroyedException e) {
                    GatewaySenderEventRemoteDispatcher.logger.info("AckReader shutting down and connection already destroyed");
                } catch (IOException e2) {
                    GatewaySenderEventRemoteDispatcher.logger.warn("Unable to shutdown AckReaderThread Connection");
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher$GatewayAck.class */
    public static class GatewayAck {
        private int batchId;
        private int numEvents;
        private BatchException70 be;

        public GatewayAck(BatchException70 batchException70, int i) {
            this.be = batchException70;
            this.batchId = i;
        }

        public GatewayAck(int i, int i2) {
            this.batchId = i;
            this.numEvents = i2;
        }

        public int getNumEvents() {
            return this.numEvents;
        }

        public int getBatchId() {
            return this.batchId;
        }

        public BatchException70 getBatchException() {
            return this.be;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher$RecoverableExceptionPredicates.class */
    public static class RecoverableExceptionPredicates {
        private RecoverableExceptionPredicates() {
        }

        static boolean isRecoverableWhenReadingAck(Exception exc) {
            return isRecoverableInAllCases(exc) || ((exc instanceof ServerConnectivityException) && !(exc.getCause() instanceof PdxRegistryMismatchException));
        }

        static boolean isRecoverableWhenDispatchingBatch(Throwable th) {
            return isRecoverableInAllCases(th) || (th instanceof ServerConnectivityException) || (th instanceof IllegalStateException);
        }

        private static boolean isRecoverableInAllCases(Throwable th) {
            return (th instanceof IOException) || (th instanceof ConnectionDestroyedException) || (th instanceof GemFireSecurityException);
        }
    }

    void setAckReaderThread(AckReaderThread ackReaderThread) {
        this.ackReaderThread = ackReaderThread;
    }

    public GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor abstractGatewaySenderEventProcessor) {
        this.processor = abstractGatewaySenderEventProcessor;
        this.sender = abstractGatewaySenderEventProcessor.getSender();
        try {
            initializeConnection();
        } catch (GatewaySenderException e) {
        }
    }

    GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor abstractGatewaySenderEventProcessor, Connection connection) {
        this.processor = abstractGatewaySenderEventProcessor;
        this.sender = abstractGatewaySenderEventProcessor.getSender();
        this.connection = connection;
    }

    /* JADX WARN: Finally extract failed */
    protected GatewayAck readAcknowledgement() {
        BatchException70 batchException70;
        SenderProxy senderProxy = new SenderProxy(this.processor.getSender().getProxy());
        GatewayAck gatewayAck = null;
        try {
            try {
                this.connection = getConnection(false);
                if (logger.isDebugEnabled()) {
                    logger.debug(" Receiving ack on the thread {}", this.connection);
                }
                this.connectionLifeCycleLock.readLock().lock();
            } catch (Exception e) {
                BatchException70 cause = e.getCause();
                if (cause instanceof BatchException70) {
                    batchException70 = cause;
                } else if (e instanceof GatewaySenderException) {
                    batchException70 = (Exception) e.getCause();
                } else {
                    batchException70 = e;
                    destroyConnection();
                }
                if (this.sender.getProxy() != null && !this.sender.getProxy().isDestroyed()) {
                    if (RecoverableExceptionPredicates.isRecoverableWhenReadingAck(batchException70)) {
                        sleepBeforeRetry();
                    } else {
                        logAndStopProcessor(batchException70);
                    }
                }
                messageProcessingAttempted.accept(true);
            }
            try {
                if (this.connection != null && !this.processor.isStopped()) {
                    gatewayAck = (GatewayAck) senderProxy.receiveAckFromReceiver(this.connection);
                }
                this.connectionLifeCycleLock.readLock().unlock();
                messageProcessingAttempted.accept(true);
                return gatewayAck;
            } catch (Throwable th) {
                this.connectionLifeCycleLock.readLock().unlock();
                throw th;
            }
        } catch (Throwable th2) {
            messageProcessingAttempted.accept(true);
            throw th2;
        }
    }

    public boolean dispatchBatch(List list, boolean z, boolean z2) {
        GatewaySenderStats statistics = this.sender.getStatistics();
        boolean z3 = false;
        try {
            try {
                try {
                    try {
                        long startTime = statistics.startTime();
                        z3 = _dispatchBatch(list, z2);
                        if (z3) {
                            statistics.endBatch(startTime, list.size());
                        }
                        messageProcessingAttempted.accept(false);
                    } catch (Exception e) {
                        logAndStopProcessor(e);
                        messageProcessingAttempted.accept(false);
                    }
                } catch (CancelException e2) {
                    logAndStopProcessor(e2);
                    throw e2;
                }
            } catch (GatewaySenderException e3) {
                Throwable cause = e3.getCause();
                if (this.sender.getProxy() != null && !this.sender.getProxy().isDestroyed()) {
                    if (RecoverableExceptionPredicates.isRecoverableWhenDispatchingBatch(cause)) {
                        this.processor.handleException();
                        sleepBeforeRetry();
                        if (logger.isDebugEnabled()) {
                            logger.debug("Failed to dispatch a batch with id {} due to non-fatal exception {}.  Retrying in {} ms", Integer.valueOf(this.processor.getBatchId()), cause, Integer.valueOf(RETRY_WAIT_TIME));
                        }
                    } else {
                        logAndStopProcessor(e3);
                    }
                }
                messageProcessingAttempted.accept(false);
            }
            return z3;
        } catch (Throwable th) {
            messageProcessingAttempted.accept(false);
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    private boolean _dispatchBatch(List list, boolean z) {
        MessageTooLargeException messageTooLargeException;
        BatchException70 batchException70;
        int batchId = this.processor.getBatchId();
        this.connection = getConnection(true);
        int batchId2 = this.processor.getBatchId();
        GatewaySenderStats statistics = this.sender.getStatistics();
        if (batchId != batchId2 || this.processor.isConnectionReset()) {
            return false;
        }
        try {
            if (this.processor.isConnectionReset()) {
                z = true;
            }
            SenderProxy senderProxy = new SenderProxy(this.sender.getProxy());
            this.connectionLifeCycleLock.readLock().lock();
            try {
                if (this.connection == null) {
                    throw new ConnectionDestroyedException();
                }
                senderProxy.dispatchBatch_NewWAN(this.connection, list, batchId, this.sender.isRemoveFromQueueOnException(), z);
                if (logger.isDebugEnabled()) {
                    logger.debug("{} : Dispatched batch (id={}) of {} events, queue size: {} on connection {}", this.processor.getSender(), Integer.valueOf(batchId), Integer.valueOf(list.size()), Integer.valueOf(this.processor.getQueue().size()), this.connection);
                }
                this.connectionLifeCycleLock.readLock().unlock();
                return true;
            } catch (Throwable th) {
                this.connectionLifeCycleLock.readLock().unlock();
                throw th;
            }
        } catch (IllegalStateException e) {
            this.processor.setException(new GatewaySenderException(e));
            throw new GatewaySenderException(String.format("%s : Exception during processing batch %s on connection %s", this, Integer.valueOf(batchId), this.connection), e);
        } catch (Exception e2) {
            Throwable cause = e2.getCause();
            Exception exc = cause instanceof IOException ? (IOException) cause : e2;
            destroyConnection();
            throw new GatewaySenderException(String.format("%s : Exception during processing batch %s on connection %s", this, Integer.valueOf(batchId), this.connection), exc);
        } catch (ServerOperationException e3) {
            BatchException70 cause2 = e3.getCause();
            if (cause2 instanceof BatchException70) {
                batchException70 = cause2;
            } else {
                batchException70 = e3;
                destroyConnection();
            }
            throw new GatewaySenderException(String.format("%s : Exception during processing batch %s on connection %s", this, Integer.valueOf(batchId), this.connection), batchException70);
        } catch (GemFireIOException e4) {
            MessageTooLargeException cause3 = e4.getCause();
            if (cause3 instanceof MessageTooLargeException) {
                messageTooLargeException = cause3;
                int min = Math.min(list.size(), this.processor.getBatchSize()) / 2;
                logger.warn(String.format("The following exception occurred attempting to send a batch of %s events. The batch will be tried again after reducing the batch size to %s events.", Integer.valueOf(list.size()), Integer.valueOf(min)), e4);
                this.processor.setBatchSize(min);
                statistics.incBatchesResized();
            } else {
                messageTooLargeException = e4;
                destroyConnection();
            }
            throw new GatewaySenderException(String.format("%s : Exception during processing batch %s on connection %s", this, Integer.valueOf(batchId), this.connection), messageTooLargeException);
        }
    }

    public Connection getConnection(boolean z) throws GatewaySenderException {
        if (this.processor.isStopped()) {
            stop();
            return null;
        }
        if (this.sender.isParallel()) {
            if (this.connection == null || this.connection.isDestroyed()) {
                initializeConnection();
            }
        } else if (this.connection == null || this.connection.isDestroyed() || !this.connection.getServer().equals(this.sender.getServerLocation())) {
            if (logger.isDebugEnabled()) {
                logger.debug("Initializing new connection as serverLocation of old connection is : {} and the serverLocation to connect is {}", this.connection == null ? "null" : this.connection.getServer(), this.sender.getServerLocation());
            }
            initializeConnection();
        }
        InternalCache cache = this.sender.getCache();
        if (cache != null && !cache.isClosed() && this.sender.isPrimary() && this.connection != null && (this.ackReaderThread == null || !this.ackReaderThread.isRunning())) {
            this.ackReaderThread = new AckReaderThread(this, (GatewaySender) this.sender, this.processor);
            this.ackReaderThread.start();
            this.ackReaderThread.waitForRunningAckReaderThreadRunningState();
        }
        return this.connection;
    }

    public void destroyConnection() {
        this.connectionLifeCycleLock.writeLock().lock();
        try {
            Connection connection = this.connection;
            if (connection != null) {
                if (!connection.isDestroyed()) {
                    connection.destroy();
                    this.sender.getProxy().returnConnection(connection);
                }
                this.connection = null;
                this.sender.setServerLocation((ServerLocation) null);
            }
        } finally {
            this.connectionLifeCycleLock.writeLock().unlock();
        }
    }

    private void initializeConnection() throws GatewaySenderException, GemFireSecurityException {
        Connection acquireConnection;
        if (this.ackReaderThread != null) {
            this.ackReaderThread.shutDownAckReaderConnection(this.connection);
        }
        this.connectionLifeCycleLock.writeLock().lock();
        try {
            try {
                if (this.sender.getProxy() == null || this.sender.getProxy().isDestroyed()) {
                    this.sender.initProxy();
                } else {
                    this.processor.resetBatchId();
                }
                try {
                    if (this.sender.isParallel()) {
                        acquireConnection = this.sender.getProxy().acquireConnection();
                        this.sender.setServerLocation(acquireConnection.getServer());
                    } else {
                        synchronized (this.sender.getLockForConcurrentDispatcher()) {
                            ServerLocation serverLocation = this.sender.getServerLocation();
                            if (serverLocation != null) {
                                if (logger.isDebugEnabled()) {
                                    logger.debug("ServerLocation is: {}. Connecting to this serverLocation...", serverLocation);
                                }
                                acquireConnection = this.sender.getProxy().acquireConnection(serverLocation);
                            } else {
                                if (logger.isDebugEnabled()) {
                                    logger.debug("ServerLocation is null. Creating new connection. ");
                                }
                                acquireConnection = this.sender.getProxy().acquireConnection();
                                if (this.sender.isPrimary()) {
                                    if (this.sender.getServerLocation() == null) {
                                        this.sender.setServerLocation(acquireConnection.getServer());
                                    }
                                    new UpdateAttributesProcessor(this.sender).distribute(false);
                                }
                            }
                        }
                    }
                    if (this.failedConnectCount > 0) {
                        logger.info("{}: Using {} after {} failed connect attempts", new Object[]{this.processor.getSender().getId(), acquireConnection, Integer.valueOf(this.failedConnectCount)});
                        this.failedConnectCount = 0;
                    } else {
                        logger.info("{}: Using {}", new Object[]{this.processor.getSender().getId(), acquireConnection});
                    }
                    this.connection = acquireConnection;
                    this.processor.checkIfPdxNeedsResend(this.connection.getQueueStatus().getPdxSize());
                    this.connectionLifeCycleLock.writeLock().unlock();
                } catch (ServerConnectivityException e) {
                    GatewaySenderException initializeConnectionExceptionToThrow = getInitializeConnectionExceptionToThrow(e);
                    this.sender.setServerLocation((ServerLocation) null);
                    if (logConnectionFailure()) {
                        logger.warn("{} : Could not connect due to: {}", this.processor.getSender().getId(), initializeConnectionExceptionToThrow.getCause().getMessage());
                    }
                    this.failedConnectCount++;
                    throw initializeConnectionExceptionToThrow;
                }
            } catch (ConnectionDestroyedException e2) {
                throw new GatewaySenderException(String.format("%s : Could not connect due to: %s", this.processor.getSender().getId(), e2.getMessage()), e2);
            }
        } catch (Throwable th) {
            this.connectionLifeCycleLock.writeLock().unlock();
            throw th;
        }
    }

    private GatewaySenderException getInitializeConnectionExceptionToThrow(ServerConnectivityException serverConnectivityException) {
        String format;
        GatewaySenderException gatewaySenderException;
        if (serverConnectivityException.getCause() instanceof GemFireSecurityException) {
            gatewaySenderException = new GatewaySenderException(serverConnectivityException.getCause());
        } else {
            List currentServers = this.sender.getProxy().getCurrentServers();
            if (currentServers.size() == 0) {
                format = "There are no active servers.";
            } else {
                StringBuilder sb = new StringBuilder();
                Iterator it = currentServers.iterator();
                while (it.hasNext()) {
                    String valueOf = String.valueOf((ServerLocation) it.next());
                    if (sb.length() > 0) {
                        sb.append(", ");
                    }
                    sb.append(valueOf);
                }
                format = String.format("No available connection was found, but the following active servers exist: %s", sb.toString());
            }
            IOException iOException = new IOException(format);
            gatewaySenderException = new GatewaySenderException(String.format("%s : Could not connect due to: %s", this.processor.getSender().getId(), iOException.getMessage()), iOException);
        }
        return gatewaySenderException;
    }

    protected boolean logConnectionFailure() {
        if (logger.isDebugEnabled() || this.failedConnectCount == 0) {
            return true;
        }
        return this.failedConnectCount >= 3000 ? this.failedConnectCount % 3000 == 0 : this.failedConnectCount == 30 || this.failedConnectCount == 300;
    }

    public void stopAckReaderThread() {
        if (this.ackReaderThread != null) {
            this.ackReaderThread.shutdown();
        }
    }

    public boolean isRemoteDispatcher() {
        return true;
    }

    public boolean isConnectedToRemote() {
        return (this.connection == null || this.connection.isDestroyed()) ? false : true;
    }

    public void shutDownAckReaderConnection() {
        if (this.ackReaderThread != null) {
            this.ackReaderThread.shutDownAckReaderConnection(this.connection);
            this.ackReaderThread.shutdown();
        }
    }

    public void stop() {
        stopAckReaderThread();
        if (this.processor.isStopped()) {
            destroyConnection();
        }
    }

    private void sleepBeforeRetry() {
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void logAndStopProcessor(Exception exc) {
        if (!(exc instanceof CancelException)) {
            logger.fatal("Stopping the processor because the following exception occurred while processing a batch:", exc);
        } else if (logger.isDebugEnabled()) {
            logger.debug("Stopping the processor because cancellation occurred while processing a batch");
        }
        this.processor.setIsStopped(true);
    }
}
