package com.gemstone.gemfire.cache.hdfs.internal.hoplog;

import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.FunctionException;
import com.gemstone.gemfire.cache.execute.FunctionService;
import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
import com.gemstone.gemfire.cache.hdfs.internal.FlushObserver;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
import com.gemstone.gemfire.i18n.LogWriterI18n;
import com.gemstone.gemfire.internal.InternalEntity;
import com.gemstone.gemfire.internal.cache.ForceReattemptException;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.class */
public class HDFSFlushQueueFunction implements Function, InternalEntity {
    private static final int MAX_RETRIES = Integer.getInteger("gemfireXD.maxFlushQueueRetries", 3).intValue();
    private static final boolean VERBOSE = Boolean.getBoolean("hdfsFlushQueueFunction.VERBOSE");
    private static final Logger logger = LogService.getLogger();
    private static final String ID = HDFSFlushQueueFunction.class.getName();

    /* loaded from: input_file:com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction$HDFSFlushQueueResultCollector.class */
    public static class HDFSFlushQueueResultCollector implements LocalResultCollector<Object, Boolean> {
        private final Set<Integer> expectedBuckets;
        private volatile ReplyProcessor21 processor;
        private final CountDownLatch complete = new CountDownLatch(1);
        private final Set<Integer> successfulBuckets = new HashSet();

        public HDFSFlushQueueResultCollector(Set<Integer> set) {
            this.expectedBuckets = set;
        }

        public Set<Integer> getSuccessfulBuckets() {
            HashSet hashSet;
            synchronized (this.successfulBuckets) {
                hashSet = new HashSet(this.successfulBuckets);
            }
            return hashSet;
        }

        @Override // com.gemstone.gemfire.cache.execute.ResultCollector
        public Boolean getResult() throws FunctionException {
            Boolean valueOf;
            try {
                this.complete.await();
                synchronized (this.successfulBuckets) {
                    LogWriterI18n loggerI18n = InternalDistributedSystem.getLoggerI18n();
                    if (loggerI18n.fineEnabled() || HDFSFlushQueueFunction.VERBOSE) {
                        loggerI18n.info(LocalizedStrings.DEBUG, "Expected buckets: " + this.expectedBuckets);
                        loggerI18n.info(LocalizedStrings.DEBUG, "Successful buckets: " + this.successfulBuckets);
                    }
                    valueOf = Boolean.valueOf(this.expectedBuckets.equals(this.successfulBuckets));
                }
                return valueOf;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                GemFireCacheImpl.getExisting().getCancelCriterion().checkCancelInProgress(e);
                throw new FunctionException(e);
            }
        }

        @Override // com.gemstone.gemfire.cache.execute.ResultCollector
        public Boolean getResult(long j, TimeUnit timeUnit) throws FunctionException, InterruptedException {
            throw new UnsupportedOperationException();
        }

        @Override // com.gemstone.gemfire.cache.execute.ResultCollector
        public synchronized void addResult(DistributedMember distributedMember, Object obj) {
            if (obj instanceof FlushStatus) {
                FlushStatus flushStatus = (FlushStatus) obj;
                if (flushStatus.isLast()) {
                    return;
                }
                synchronized (this.successfulBuckets) {
                    this.successfulBuckets.add(Integer.valueOf(flushStatus.getBucketId()));
                }
            }
        }

        @Override // com.gemstone.gemfire.cache.execute.ResultCollector
        public void endResults() {
            this.complete.countDown();
        }

        @Override // com.gemstone.gemfire.cache.execute.ResultCollector
        public void clearResults() {
        }

        @Override // com.gemstone.gemfire.internal.cache.execute.LocalResultCollector
        public void setProcessor(ReplyProcessor21 replyProcessor21) {
            this.processor = replyProcessor21;
        }

        @Override // com.gemstone.gemfire.internal.cache.execute.LocalResultCollector
        public ReplyProcessor21 getProcessor() {
            return this.processor;
        }

        @Override // com.gemstone.gemfire.internal.cache.execute.LocalResultCollector
        public void setException(Throwable th) {
        }
    }

    public static void flushQueue(PartitionedRegion partitionedRegion, int i) {
        HashSet hashSet = new HashSet(partitionedRegion.getRegionAdvisor().getBucketSet());
        int i2 = i * 1000;
        long currentTimeMillis = System.currentTimeMillis();
        int i3 = 0;
        while (true) {
            int i4 = i3;
            i3++;
            if (i4 >= MAX_RETRIES) {
                break;
            }
            long waitTime = waitTime(currentTimeMillis, i2);
            if (waitTime <= 0) {
                break;
            }
            if (logger.isDebugEnabled() || VERBOSE) {
                logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushing buckets " + hashSet + ", attempt = " + i3 + ", remaining = " + waitTime));
            }
            HDFSFlushQueueArgs hDFSFlushQueueArgs = new HDFSFlushQueueArgs(hashSet, waitTime);
            HDFSFlushQueueResultCollector hDFSFlushQueueResultCollector = new HDFSFlushQueueResultCollector(hashSet);
            AbstractExecution abstractExecution = (AbstractExecution) FunctionService.onRegion(partitionedRegion).withArgs(hDFSFlushQueueArgs).withCollector(hDFSFlushQueueResultCollector);
            abstractExecution.setWaitOnExceptionFlag(true);
            try {
                abstractExecution.execute(ID);
                if (hDFSFlushQueueResultCollector.getResult().booleanValue()) {
                    if (logger.isDebugEnabled() || VERBOSE) {
                        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushed all buckets successfully"));
                        return;
                    }
                    return;
                }
            } catch (FunctionException e) {
                if (logger.isDebugEnabled() || VERBOSE) {
                    logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Encountered error flushing queue"), e);
                }
            }
            hashSet.removeAll(hDFSFlushQueueResultCollector.getSuccessfulBuckets());
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                int intValue = ((Integer) it.next()).intValue();
                long waitTime2 = waitTime(currentTimeMillis, i2);
                if (logger.isDebugEnabled() || VERBOSE) {
                    logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Waiting for bucket " + intValue));
                }
                partitionedRegion.getNodeForBucketWrite(intValue, new PartitionedRegion.RetryTimeKeeper((int) waitTime2));
            }
        }
        partitionedRegion.checkReadiness();
        throw new FunctionException("Unable to flush the following buckets: " + hashSet);
    }

    private static long waitTime(long j, long j2) {
        if (j2 == 0) {
            return 2147483647L;
        }
        return (j + j2) - System.currentTimeMillis();
    }

    @Override // com.gemstone.gemfire.cache.execute.Function
    public void execute(FunctionContext functionContext) {
        RegionFunctionContext regionFunctionContext = (RegionFunctionContext) functionContext;
        PartitionedRegion partitionedRegion = (PartitionedRegion) regionFunctionContext.getDataSet();
        HDFSFlushQueueArgs hDFSFlushQueueArgs = (HDFSFlushQueueArgs) regionFunctionContext.getArguments();
        HashSet hashSet = new HashSet(hDFSFlushQueueArgs.getBuckets());
        hashSet.retainAll(partitionedRegion.getDataStore().getAllLocalPrimaryBucketIds());
        HashMap hashMap = new HashMap();
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            try {
                HDFSBucketRegionQueue queue = getQueue(partitionedRegion, intValue);
                if (queue != null) {
                    if (logger.isDebugEnabled() || VERBOSE) {
                        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushing bucket " + intValue));
                    }
                    hashMap.put(Integer.valueOf(intValue), queue.flush());
                }
            } catch (ForceReattemptException e) {
                if (logger.isDebugEnabled() || VERBOSE) {
                    logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Encountered error flushing bucket " + intValue), e);
                }
            }
        }
        try {
            long currentTimeMillis = System.currentTimeMillis();
            for (Map.Entry entry : hashMap.entrySet()) {
                long waitTime = waitTime(currentTimeMillis, hDFSFlushQueueArgs.getMaxWaitTime());
                if (logger.isDebugEnabled() || VERBOSE) {
                    logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Waiting for bucket " + entry.getKey() + " to complete flushing, remaining = " + waitTime));
                }
                if (((FlushObserver.AsyncFlushResult) entry.getValue()).waitForFlush(waitTime, TimeUnit.MILLISECONDS)) {
                    if (logger.isDebugEnabled() || VERBOSE) {
                        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Bucket " + entry.getKey() + " flushed successfully"));
                    }
                    regionFunctionContext.getResultSender().sendResult(new FlushStatus(((Integer) entry.getKey()).intValue()));
                }
            }
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
        }
        if (logger.isDebugEnabled() || VERBOSE) {
            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Sending final flush result"));
        }
        regionFunctionContext.getResultSender().lastResult(FlushStatus.last());
    }

    private HDFSBucketRegionQueue getQueue(PartitionedRegion partitionedRegion, int i) throws ForceReattemptException {
        AbstractGatewaySenderEventProcessor eventProcessor = ((AbstractGatewaySender) partitionedRegion.getHDFSEventQueue().getSender()).getEventProcessor();
        if (eventProcessor == null) {
            return null;
        }
        return ((ConcurrentParallelGatewaySenderQueue) eventProcessor.getQueue()).getBucketRegionQueue(partitionedRegion, i);
    }

    @Override // com.gemstone.gemfire.cache.execute.Function, com.gemstone.gemfire.lang.Identifiable
    public String getId() {
        return ID;
    }

    @Override // com.gemstone.gemfire.cache.execute.Function
    public boolean hasResult() {
        return true;
    }

    @Override // com.gemstone.gemfire.cache.execute.Function
    public boolean optimizeForWrite() {
        return true;
    }

    @Override // com.gemstone.gemfire.cache.execute.Function
    public boolean isHA() {
        return false;
    }
}
