package com.gemstone.gemfire.internal.cache;

import com.gemstone.gemfire.cache.CacheWriterException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.TimeoutException;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.cache.persistence.query.mock.ByteComparator;
import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
import com.gemstone.gemfire.internal.cache.wan.parallel.BucketRegionQueueUnavailableException;
import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import com.gemstone.gemfire.internal.concurrent.Atomics;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.offheap.OffHeapRegionEntryHelper;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/gemstone/gemfire/internal/cache/BucketRegionQueue.class */
public class BucketRegionQueue extends AbstractBucketRegionQueue {
    private static final Logger logger = LogService.getLogger();
    private final Map indexes;
    private final BlockingQueue<Object> eventSeqNumQueue;
    private long lastKeyRecovered;

    public BucketRegionQueue(String str, RegionAttributes regionAttributes, LocalRegion localRegion, GemFireCacheImpl gemFireCacheImpl, InternalRegionArguments internalRegionArguments) {
        super(str, regionAttributes, localRegion, gemFireCacheImpl, internalRegionArguments);
        this.eventSeqNumQueue = new LinkedBlockingQueue();
        keySet();
        this.indexes = new ConcurrentHashMap();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.gemstone.gemfire.internal.cache.BucketRegion, com.gemstone.gemfire.internal.cache.DistributedRegion, com.gemstone.gemfire.internal.cache.LocalRegion
    public void initialize(InputStream inputStream, InternalDistributedMember internalDistributedMember, InternalRegionArguments internalRegionArguments) throws TimeoutException, IOException, ClassNotFoundException {
        super.initialize(inputStream, internalDistributedMember, internalRegionArguments);
        loadEventsFromTempQueue();
        getInitializationLock().writeLock().lock();
        try {
            if (!keySet().isEmpty()) {
                if (getPartitionedRegion().getColocatedWith() == null) {
                    ArrayList arrayList = new ArrayList(keySet());
                    Collections.sort(arrayList, new Comparator<EventID>() { // from class: com.gemstone.gemfire.internal.cache.BucketRegionQueue.1
                        @Override // java.util.Comparator
                        public int compare(EventID eventID, EventID eventID2) {
                            int compare = new ByteComparator().compare(eventID.getMembershipID(), eventID2.getMembershipID());
                            if (compare == 1) {
                                return 1;
                            }
                            if (compare == -1) {
                                return -1;
                            }
                            if (eventID.getThreadID() > eventID2.getThreadID()) {
                                return 1;
                            }
                            if (eventID.getThreadID() >= eventID2.getThreadID() && eventID.getSequenceID() >= eventID2.getSequenceID()) {
                                return eventID.getSequenceID() == eventID2.getSequenceID() ? 0 : 1;
                            }
                            return -1;
                        }
                    });
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        this.eventSeqNumQueue.add((EventID) it.next());
                    }
                } else {
                    TreeSet treeSet = new TreeSet(keySet());
                    if (!treeSet.isEmpty()) {
                        Iterator it2 = treeSet.iterator();
                        while (it2.hasNext()) {
                            this.eventSeqNumQueue.add((Long) it2.next());
                        }
                        this.lastKeyRecovered = ((Long) treeSet.last()).longValue();
                        if (getEventSeqNum() != null) {
                            Atomics.setIfGreater(getEventSeqNum(), this.lastKeyRecovered);
                        }
                    }
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("For bucket {} ,total keys recovered are : {} last key recovered is : {} and the seqNo is ", new Object[]{Integer.valueOf(getId()), Integer.valueOf(this.eventSeqNumQueue.size()), Long.valueOf(this.lastKeyRecovered), getEventSeqNum()});
                }
            }
            this.initialized = true;
            destroyFailedBatchRemovalMessageKeys();
            notifyEventProcessor();
            getInitializationLock().writeLock().unlock();
        } catch (Throwable th) {
            notifyEventProcessor();
            getInitializationLock().writeLock().unlock();
            throw th;
        }
    }

    private void destroyFailedBatchRemovalMessageKeys() {
        boolean isDebugEnabled = logger.isDebugEnabled();
        Iterator<Object> it = getFailedBatchRemovalMessageKeys().iterator();
        while (it.hasNext()) {
            Object next = it.next();
            it.remove();
            if (isDebugEnabled) {
                logger.debug("key from failedBatchRemovalMessageKeys is: {}", new Object[]{next});
            }
            if (containsKey(next)) {
                try {
                    destroyKey(next);
                    if (isDebugEnabled) {
                        logger.debug("Destroyed {} from bucket: ", new Object[]{next, Integer.valueOf(getId())});
                    }
                } catch (ForceReattemptException e) {
                    if (isDebugEnabled) {
                        logger.debug("Bucket :{} moved to other member", new Object[]{Integer.valueOf(getId())});
                    }
                }
            }
        }
    }

    @Override // com.gemstone.gemfire.internal.cache.BucketRegion
    public void beforeAcquiringPrimaryState() {
        markEventsAsDuplicate(getPartitionedRegion().getParallelGatewaySender().getBatchSize(), this.eventSeqNumQueue.iterator());
    }

    @Override // com.gemstone.gemfire.internal.cache.AbstractBucketRegionQueue, com.gemstone.gemfire.internal.cache.LocalRegion
    public void closeEntries() {
        OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() { // from class: com.gemstone.gemfire.internal.cache.BucketRegionQueue.2
            @Override // java.lang.Runnable
            public void run() {
                BucketRegionQueue.super.closeEntries();
            }
        });
        this.indexes.clear();
        this.eventSeqNumQueue.clear();
    }

    @Override // com.gemstone.gemfire.internal.cache.AbstractBucketRegionQueue, com.gemstone.gemfire.internal.cache.LocalRegion
    public Set<VersionSource> clearEntries(final RegionVersionVector regionVersionVector) {
        final AtomicReference atomicReference = new AtomicReference();
        OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() { // from class: com.gemstone.gemfire.internal.cache.BucketRegionQueue.3
            @Override // java.lang.Runnable
            public void run() {
                atomicReference.set(BucketRegionQueue.super.clearEntries(regionVersionVector));
            }
        });
        this.eventSeqNumQueue.clear();
        return (Set) atomicReference.get();
    }

    @Override // com.gemstone.gemfire.internal.cache.AbstractBucketRegionQueue, com.gemstone.gemfire.internal.cache.BucketRegion
    public void forceSerialized(EntryEventImpl entryEventImpl) {
    }

    @Override // com.gemstone.gemfire.internal.cache.AbstractBucketRegionQueue
    protected void clearQueues() {
        getInitializationLock().writeLock().lock();
        try {
            this.indexes.clear();
            this.eventSeqNumQueue.clear();
        } finally {
            getInitializationLock().writeLock().unlock();
        }
    }

    @Override // com.gemstone.gemfire.internal.cache.AbstractBucketRegionQueue, com.gemstone.gemfire.internal.cache.BucketRegion, com.gemstone.gemfire.internal.cache.DistributedRegion, com.gemstone.gemfire.internal.cache.LocalRegion
    protected boolean virtualPut(EntryEventImpl entryEventImpl, boolean z, boolean z2, Object obj, boolean z3, long j, boolean z4) throws TimeoutException, CacheWriterException {
        boolean virtualPut = super.virtualPut(entryEventImpl, z, z2, obj, z3, j, z4);
        if (virtualPut) {
            Object rawOldValue = entryEventImpl.getRawOldValue();
            if (rawOldValue instanceof GatewaySenderEventImpl) {
                ((GatewaySenderEventImpl) rawOldValue).release();
            }
            if (getPartitionedRegion().getColocatedWith() == null) {
                return virtualPut;
            }
            if (getPartitionedRegion().isConflationEnabled() && getBucketAdvisor().isPrimary()) {
                Object newValue = entryEventImpl.getNewValue();
                Long l = (Long) entryEventImpl.getKey();
                if (newValue instanceof Conflatable) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Key :{} , Object : {} is conflatable", new Object[]{l, newValue});
                    }
                    conflateOldEntry((Conflatable) newValue, l);
                } else if (logger.isDebugEnabled()) {
                    logger.debug("Object : {} is not conflatable", new Object[]{newValue});
                }
            }
        }
        return virtualPut;
    }

    private void conflateOldEntry(Conflatable conflatable, Long l) {
        PartitionedRegion partitionedRegion = getPartitionedRegion();
        if (!partitionedRegion.isConflationEnabled() || !conflatable.shouldBeConflated()) {
            if (logger.isDebugEnabled()) {
                logger.debug("{}: Not conflating {}", new Object[]{this, conflatable});
                return;
            }
            return;
        }
        Object keyToConflate = conflatable.getKeyToConflate();
        String regionToConflate = conflatable.getRegionToConflate();
        if (logger.isDebugEnabled()) {
            logger.debug(" The region name is : {}", new Object[]{regionToConflate});
        }
        Map map = (Map) this.indexes.get(regionToConflate);
        if (map == null) {
            map = new ConcurrentHashMap();
            this.indexes.put(regionToConflate, map);
        }
        Long l2 = (Long) map.put(keyToConflate, l);
        if (l2 == null) {
            partitionedRegion.getParallelGatewaySender().getStatistics().incConflationIndexesMapSize();
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("{}: Conflating {} at queue index={} and previousTailKey: ", new Object[]{this, conflatable, l, l2});
        }
        AbstractGatewaySenderEventProcessor eventProcessor = partitionedRegion.getParallelGatewaySender().getEventProcessor();
        if (eventProcessor == null) {
            return;
        }
        ((ConcurrentParallelGatewaySenderQueue) eventProcessor.getQueue()).conflateEvent(conflatable, getId(), l2);
    }

    private void removeIndex(Long l) {
        Long l2;
        Object noLRU = getNoLRU(l, true, false, false);
        if (noLRU instanceof Conflatable) {
            Conflatable conflatable = (Conflatable) noLRU;
            if (conflatable.shouldBeConflated()) {
                String regionToConflate = conflatable.getRegionToConflate();
                Object keyToConflate = conflatable.getKeyToConflate();
                Map map = (Map) this.indexes.get(regionToConflate);
                if (map == null || (l2 = (Long) map.remove(keyToConflate)) == null) {
                    return;
                }
                getPartitionedRegion().getParallelGatewaySender().getStatistics().decConflationIndexesMapSize();
                if (logger.isDebugEnabled()) {
                    logger.debug("{}: Removed index {} for {}", new Object[]{this, l2, conflatable});
                }
            }
        }
    }

    @Override // com.gemstone.gemfire.internal.cache.AbstractBucketRegionQueue, com.gemstone.gemfire.internal.cache.BucketRegion, com.gemstone.gemfire.internal.cache.DistributedRegion
    protected void basicDestroy(EntryEventImpl entryEventImpl, boolean z, Object obj) throws EntryNotFoundException, CacheWriterException, TimeoutException {
        if (getPartitionedRegion().isConflationEnabled()) {
            removeIndex((Long) entryEventImpl.getKey());
        }
        super.basicDestroy(entryEventImpl, z, obj);
        Object rawOldValue = entryEventImpl.getRawOldValue();
        if (rawOldValue instanceof GatewaySenderEventImpl) {
            ((GatewaySenderEventImpl) rawOldValue).release();
        }
        if (getBucketAdvisor().isPrimary()) {
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug(" removing the key {} from eventSeqNumQueue", new Object[]{entryEventImpl.getKey()});
        }
        this.eventSeqNumQueue.remove(entryEventImpl.getKey());
    }

    private Object optimalGet(Object obj) {
        Object obj2 = null;
        try {
            obj2 = getValueInVMOrDiskWithoutFaultIn(obj);
            if (obj2 != null && (obj2 instanceof CachedDeserializable)) {
                obj2 = ((CachedDeserializable) obj2).getDeserializedValue(this, getRegionEntry(obj));
            }
        } catch (EntryNotFoundException e) {
        }
        if (obj2 == Token.TOMBSTONE) {
            obj2 = null;
        }
        return obj2;
    }

    public Object peek() {
        Object obj = null;
        getInitializationLock().readLock().lock();
        try {
            if (getPartitionedRegion().isDestroyed()) {
                throw new BucketRegionQueueUnavailableException();
            }
            Object peek = this.eventSeqNumQueue.peek();
            if (peek != null) {
                obj = optimalGet(peek);
                if (obj == null && !getPartitionedRegion().isConflationEnabled() && logger.isDebugEnabled()) {
                    logger.debug("The value against key {} in the bucket region queue with id {} is NULL for the GatewaySender {}", new Object[]{peek, Integer.valueOf(getId()), getPartitionedRegion().getParallelGatewaySender()});
                }
                this.eventSeqNumQueue.remove(peek);
            }
            return obj;
        } finally {
            getInitializationLock().readLock().unlock();
        }
    }

    @Override // com.gemstone.gemfire.internal.cache.AbstractBucketRegionQueue
    protected void addToEventQueue(Object obj, boolean z, EntryEventImpl entryEventImpl, int i) {
        if (z) {
            if (this.initialized) {
                this.eventSeqNumQueue.add(obj);
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Put successfully in the queue : {} was initialized: {}", new Object[]{entryEventImpl.getRawNewValue(), Boolean.valueOf(this.initialized)});
            }
        }
        if (getBucketAdvisor().isPrimary()) {
            incQueueSize(1);
        }
    }

    public Object remove() throws ForceReattemptException {
        Object remove = this.eventSeqNumQueue.remove();
        if (remove != null) {
            destroyKey(remove);
        }
        return remove;
    }

    public Object take() throws InterruptedException, ForceReattemptException {
        throw new UnsupportedOperationException();
    }

    @Override // com.gemstone.gemfire.internal.cache.AbstractBucketRegionQueue
    public void destroyKey(Object obj) throws ForceReattemptException {
        if (logger.isDebugEnabled()) {
            logger.debug(" destroying primary key {}", new Object[]{obj});
        }
        EntryEventImpl newDestroyEntryEvent = getPartitionedRegion().newDestroyEntryEvent(obj, null);
        newDestroyEntryEvent.setEventId(new EventID(this.cache.getSystem()));
        try {
            try {
                newDestroyEntryEvent.setRegion(this);
                basicDestroy(newDestroyEntryEvent, true, null);
                checkReadiness();
                newDestroyEntryEvent.release();
            } catch (EntryNotFoundException e) {
                if (getPartitionedRegion().isDestroyed()) {
                    getPartitionedRegion().checkReadiness();
                    if (isBucketDestroyed()) {
                        throw new ForceReattemptException("Bucket moved", new RegionDestroyedException(LocalizedStrings.PartitionedRegionDataStore_REGION_HAS_BEEN_DESTROYED.toLocalizedString(), getPartitionedRegion().getFullPath()));
                    }
                }
                throw e;
            } catch (RegionDestroyedException e2) {
                getPartitionedRegion().checkReadiness();
                if (isBucketDestroyed()) {
                    throw new ForceReattemptException("Bucket moved while destroying key " + obj, e2);
                }
                newDestroyEntryEvent.release();
            }
            notifyEntriesRemoved();
        } catch (Throwable th) {
            newDestroyEntryEvent.release();
            throw th;
        }
    }

    public boolean isReadyForPeek() {
        return (getPartitionedRegion().isDestroyed() || isEmpty() || this.eventSeqNumQueue.isEmpty() || !getBucketAdvisor().isPrimary()) ? false : true;
    }
}
