package org.apache.iotdb.commons.pipe.datastructure.queue.listening;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.queue.serializer.QueueSerializerType;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.class */
public abstract class AbstractPipeListeningQueue extends AbstractSerializableListeningQueue<Event> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractPipeListeningQueue.class);
    private final Pair<Long, List<PipeSnapshotEvent>> queueTailIndex2SnapshotsCache;

    protected AbstractPipeListeningQueue() {
        super(QueueSerializerType.PLAIN);
        this.queueTailIndex2SnapshotsCache = new Pair<>(Long.MIN_VALUE, new ArrayList());
    }

    protected synchronized void tryListen(EnrichedEvent enrichedEvent) {
        if (super.tryListen((AbstractPipeListeningQueue) enrichedEvent)) {
            enrichedEvent.increaseReferenceCount(AbstractPipeListeningQueue.class.getName());
        }
    }

    protected synchronized void tryListen(List<PipeSnapshotEvent> list) {
        if (this.isClosed.get()) {
            return;
        }
        clearSnapshots();
        this.queueTailIndex2SnapshotsCache.setLeft(Long.valueOf(this.queue.getTailIndex()));
        list.forEach(pipeSnapshotEvent -> {
            pipeSnapshotEvent.increaseReferenceCount(AbstractPipeListeningQueue.class.getName());
        });
        this.queueTailIndex2SnapshotsCache.setRight(list);
        LOGGER.info("Pipe listening queue snapshot cache is updated: {}", this.queueTailIndex2SnapshotsCache);
    }

    public synchronized Pair<Long, List<PipeSnapshotEvent>> findAvailableSnapshots() {
        if (((Long) this.queueTailIndex2SnapshotsCache.getLeft()).longValue() < this.queue.getTailIndex() - PipeConfig.getInstance().getPipeListeningQueueTransferSnapshotThreshold()) {
            clearSnapshots();
        }
        return this.queueTailIndex2SnapshotsCache;
    }

    @Override // org.apache.iotdb.commons.pipe.datastructure.queue.listening.AbstractSerializableListeningQueue
    public synchronized long removeBefore(long j) {
        long removeBefore = super.removeBefore(j);
        if (((Long) this.queueTailIndex2SnapshotsCache.getLeft()).longValue() < removeBefore) {
            clearSnapshots();
        }
        return removeBefore;
    }

    private synchronized void clearSnapshots() {
        this.queueTailIndex2SnapshotsCache.setLeft(Long.MIN_VALUE);
        ((List) this.queueTailIndex2SnapshotsCache.getRight()).forEach(pipeSnapshotEvent -> {
            pipeSnapshotEvent.decreaseReferenceCount(AbstractPipeListeningQueue.class.getName(), false);
        });
        this.queueTailIndex2SnapshotsCache.setRight(new ArrayList());
    }

    @Override // org.apache.iotdb.commons.pipe.datastructure.queue.listening.AbstractSerializableListeningQueue, java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() throws IOException {
        clearSnapshots();
        super.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iotdb.commons.pipe.datastructure.queue.listening.AbstractSerializableListeningQueue
    public void releaseResource(Event event) {
        if (event instanceof EnrichedEvent) {
            ((EnrichedEvent) event).decreaseReferenceCount(AbstractPipeListeningQueue.class.getName(), false);
        }
    }
}
