package software.amazon.kinesis.retrieval.polling;

import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.commons.lang3.Validate;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.DiagnosticUtils;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.metrics.ThreadSafeMetricsDelegatingFactory;
import software.amazon.kinesis.retrieval.BatchUniqueIdentifier;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsDeliveryAck;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

@KinesisClientInternalApi
/* loaded from: input_file:META-INF/bundled-dependencies/amazon-kinesis-client-2.2.3.jar:software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.class */
public class PrefetchRecordsPublisher implements RecordsPublisher {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PrefetchRecordsPublisher.class);
    private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";

    @VisibleForTesting
    LinkedBlockingQueue<PrefetchRecordsRetrieved> getRecordsResultQueue;
    private int maxPendingProcessRecordsInput;
    private int maxByteSize;
    private int maxRecordsCount;
    private final int maxRecordsPerCall;
    private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
    private final ExecutorService executorService;
    private final MetricsFactory metricsFactory;
    private final long idleMillisBetweenCalls;
    private Instant lastSuccessfulCall;
    private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;
    private PrefetchCounters prefetchCounters;
    private final String operation;
    private final KinesisDataFetcher dataFetcher;
    private final String shardId;
    private Subscriber<? super RecordsRetrieved> subscriber;
    private String highestSequenceNumber;
    private InitialPositionInStreamExtended initialPositionInStreamExtended;
    private boolean started = false;
    private final AtomicLong requestedResponses = new AtomicLong(0);
    private final ReentrantReadWriteLock resetLock = new ReentrantReadWriteLock();
    private boolean wasReset = false;
    private Instant lastEventDeliveryTime = Instant.EPOCH;
    private AtomicBoolean shouldDrainEventOnlyOnAck = new AtomicBoolean(false);

    /* loaded from: input_file:META-INF/bundled-dependencies/amazon-kinesis-client-2.2.3.jar:software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher$DefaultGetRecordsCacheDaemon.class */
    private class DefaultGetRecordsCacheDaemon implements Runnable {
        volatile boolean isShutdown;

        private DefaultGetRecordsCacheDaemon() {
            this.isShutdown = false;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                if (this.isShutdown) {
                    break;
                }
                if (Thread.currentThread().isInterrupted()) {
                    PrefetchRecordsPublisher.log.warn("{} : Prefetch thread was interrupted.", PrefetchRecordsPublisher.this.shardId);
                    break;
                }
                PrefetchRecordsPublisher.this.resetLock.readLock().lock();
                try {
                    makeRetrievalAttempt();
                } catch (PositionResetException e) {
                    PrefetchRecordsPublisher.log.debug("{} : Position was reset while attempting to add item to queue.", PrefetchRecordsPublisher.this.shardId);
                } finally {
                    PrefetchRecordsPublisher.this.resetLock.readLock().unlock();
                }
            }
            callShutdownOnStrategy();
        }

        private void makeRetrievalAttempt() {
            MetricsScope createMetricsWithOperation = MetricsUtil.createMetricsWithOperation(PrefetchRecordsPublisher.this.metricsFactory, PrefetchRecordsPublisher.this.operation);
            try {
                if (!PrefetchRecordsPublisher.this.prefetchCounters.shouldGetNewRecords()) {
                    try {
                        PrefetchRecordsPublisher.this.prefetchCounters.waitForConsumer();
                        return;
                    } catch (InterruptedException e) {
                        PrefetchRecordsPublisher.log.info("{} :  Thread was interrupted while waiting for the consumer.  Shutdown has probably been started", PrefetchRecordsPublisher.this.shardId);
                        return;
                    }
                }
                try {
                    try {
                        try {
                            try {
                                sleepBeforeNextCall();
                                GetRecordsResponse records = PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.getRecords(PrefetchRecordsPublisher.this.maxRecordsPerCall);
                                PrefetchRecordsPublisher.this.lastSuccessfulCall = Instant.now();
                                ProcessRecordsInput build = ProcessRecordsInput.builder().records((List) records.records().stream().map(KinesisClientRecord::fromRecord).collect(Collectors.toList())).millisBehindLatest(records.millisBehindLatest()).cacheEntryTime(PrefetchRecordsPublisher.this.lastSuccessfulCall).isAtShardEnd(PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.getDataFetcher().isShardEndReached()).build();
                                PrefetchRecordsPublisher.this.highestSequenceNumber = PrefetchRecordsPublisher.this.calculateHighestSequenceNumber(build);
                                PrefetchRecordsRetrieved prefetchRecordsRetrieved = new PrefetchRecordsRetrieved(build, PrefetchRecordsPublisher.this.highestSequenceNumber, records.nextShardIterator(), PrefetchRecordsRetrieved.generateBatchUniqueIdentifier());
                                PrefetchRecordsPublisher.this.highestSequenceNumber = prefetchRecordsRetrieved.lastBatchSequenceNumber;
                                PrefetchRecordsPublisher.this.addArrivedRecordsInput(prefetchRecordsRetrieved);
                                PrefetchRecordsPublisher.this.drainQueueForRequestsIfAllowed();
                                MetricsUtil.endScope(createMetricsWithOperation);
                            } catch (InterruptedException e2) {
                                PrefetchRecordsPublisher.log.info("{} :  Thread was interrupted, indicating shutdown was called on the cache.", PrefetchRecordsPublisher.this.shardId);
                                MetricsUtil.endScope(createMetricsWithOperation);
                            }
                        } catch (RetryableRetrievalException e3) {
                            PrefetchRecordsPublisher.log.info("{} :  Timeout occurred while waiting for response from Kinesis.  Will retry the request.", PrefetchRecordsPublisher.this.shardId);
                            MetricsUtil.endScope(createMetricsWithOperation);
                        }
                    } catch (ExpiredIteratorException e4) {
                        PrefetchRecordsPublisher.log.info("{} :  records threw ExpiredIteratorException - restarting after greatest seqNum passed to customer", PrefetchRecordsPublisher.this.shardId, e4);
                        createMetricsWithOperation.addData(PrefetchRecordsPublisher.EXPIRED_ITERATOR_METRIC, 1.0d, StandardUnit.COUNT, MetricsLevel.SUMMARY);
                        PrefetchRecordsPublisher.this.dataFetcher.restartIterator();
                        MetricsUtil.endScope(createMetricsWithOperation);
                    } catch (PositionResetException e5) {
                        throw e5;
                    }
                } catch (SdkException e6) {
                    PrefetchRecordsPublisher.log.error("{} :  Exception thrown while fetching records from Kinesis", PrefetchRecordsPublisher.this.shardId, e6);
                    MetricsUtil.endScope(createMetricsWithOperation);
                } catch (Throwable th) {
                    PrefetchRecordsPublisher.log.error("{} :  Unexpected exception was thrown. This could probably be an issue or a bug. Please search for the exception/error online to check what is going on. If the issue persists or is a recurring problem, feel free to open an issue on, https://github.com/awslabs/amazon-kinesis-client.", PrefetchRecordsPublisher.this.shardId, th);
                    MetricsUtil.endScope(createMetricsWithOperation);
                }
            } catch (Throwable th2) {
                MetricsUtil.endScope(createMetricsWithOperation);
                throw th2;
            }
        }

        private void callShutdownOnStrategy() {
            if (PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.isShutdown()) {
                return;
            }
            PrefetchRecordsPublisher.this.getRecordsRetrievalStrategy.shutdown();
        }

        private void sleepBeforeNextCall() throws InterruptedException {
            if (PrefetchRecordsPublisher.this.lastSuccessfulCall == null) {
                return;
            }
            long millis = Duration.between(PrefetchRecordsPublisher.this.lastSuccessfulCall, Instant.now()).abs().toMillis();
            if (millis < PrefetchRecordsPublisher.this.idleMillisBetweenCalls) {
                Thread.sleep(PrefetchRecordsPublisher.this.idleMillisBetweenCalls - millis);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/amazon-kinesis-client-2.2.3.jar:software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher$PositionResetException.class */
    public static class PositionResetException extends RuntimeException {
        private PositionResetException() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/amazon-kinesis-client-2.2.3.jar:software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher$PrefetchCounters.class */
    public class PrefetchCounters {
        private long size;
        private long byteSize;

        private PrefetchCounters() {
            this.size = 0L;
            this.byteSize = 0L;
        }

        public synchronized void added(ProcessRecordsInput processRecordsInput) {
            this.size += getSize(processRecordsInput);
            this.byteSize += getByteSize(processRecordsInput);
        }

        public synchronized void removed(ProcessRecordsInput processRecordsInput) {
            this.size -= getSize(processRecordsInput);
            this.byteSize -= getByteSize(processRecordsInput);
            notifyAll();
        }

        private long getSize(ProcessRecordsInput processRecordsInput) {
            return processRecordsInput.records().size();
        }

        private long getByteSize(ProcessRecordsInput processRecordsInput) {
            return processRecordsInput.records().stream().mapToLong(kinesisClientRecord -> {
                return kinesisClientRecord.data().limit();
            }).sum();
        }

        public synchronized void waitForConsumer() throws InterruptedException {
            if (shouldGetNewRecords()) {
                return;
            }
            PrefetchRecordsPublisher.log.debug("{} : Queue is full waiting for consumer for {} ms", PrefetchRecordsPublisher.this.shardId, Long.valueOf(PrefetchRecordsPublisher.this.idleMillisBetweenCalls));
            wait(PrefetchRecordsPublisher.this.idleMillisBetweenCalls);
        }

        public synchronized boolean shouldGetNewRecords() {
            if (PrefetchRecordsPublisher.log.isDebugEnabled()) {
                PrefetchRecordsPublisher.log.debug("{} : Current Prefetch Counter States: {}", PrefetchRecordsPublisher.this.shardId, toString());
            }
            return this.size < ((long) PrefetchRecordsPublisher.this.maxRecordsCount) && this.byteSize < ((long) PrefetchRecordsPublisher.this.maxByteSize);
        }

        void reset() {
            this.size = 0L;
            this.byteSize = 0L;
        }

        public String toString() {
            return String.format("{ Requests: %d, Records: %d, Bytes: %d }", Integer.valueOf(PrefetchRecordsPublisher.this.getRecordsResultQueue.size()), Long.valueOf(this.size), Long.valueOf(this.byteSize));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/amazon-kinesis-client-2.2.3.jar:software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher$PrefetchRecordsRetrieved.class */
    public static class PrefetchRecordsRetrieved implements RecordsRetrieved {
        final ProcessRecordsInput processRecordsInput;
        final String lastBatchSequenceNumber;
        final String shardIterator;
        final BatchUniqueIdentifier batchUniqueIdentifier;

        PrefetchRecordsRetrieved prepareForPublish() {
            return new PrefetchRecordsRetrieved(this.processRecordsInput.toBuilder().cacheExitTime(Instant.now()).build(), this.lastBatchSequenceNumber, this.shardIterator, this.batchUniqueIdentifier);
        }

        @Override // software.amazon.kinesis.retrieval.RecordsRetrieved
        public BatchUniqueIdentifier batchUniqueIdentifier() {
            return this.batchUniqueIdentifier;
        }

        public static BatchUniqueIdentifier generateBatchUniqueIdentifier() {
            return new BatchUniqueIdentifier(UUID.randomUUID().toString(), "");
        }

        public PrefetchRecordsRetrieved(ProcessRecordsInput processRecordsInput, String str, String str2, BatchUniqueIdentifier batchUniqueIdentifier) {
            this.processRecordsInput = processRecordsInput;
            this.lastBatchSequenceNumber = str;
            this.shardIterator = str2;
            this.batchUniqueIdentifier = batchUniqueIdentifier;
        }

        @Override // software.amazon.kinesis.retrieval.RecordsRetrieved
        public ProcessRecordsInput processRecordsInput() {
            return this.processRecordsInput;
        }

        public String lastBatchSequenceNumber() {
            return this.lastBatchSequenceNumber;
        }

        public String shardIterator() {
            return this.shardIterator;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof PrefetchRecordsRetrieved)) {
                return false;
            }
            PrefetchRecordsRetrieved prefetchRecordsRetrieved = (PrefetchRecordsRetrieved) obj;
            if (!prefetchRecordsRetrieved.canEqual(this)) {
                return false;
            }
            ProcessRecordsInput processRecordsInput = processRecordsInput();
            ProcessRecordsInput processRecordsInput2 = prefetchRecordsRetrieved.processRecordsInput();
            if (processRecordsInput == null) {
                if (processRecordsInput2 != null) {
                    return false;
                }
            } else if (!processRecordsInput.equals(processRecordsInput2)) {
                return false;
            }
            String lastBatchSequenceNumber = lastBatchSequenceNumber();
            String lastBatchSequenceNumber2 = prefetchRecordsRetrieved.lastBatchSequenceNumber();
            if (lastBatchSequenceNumber == null) {
                if (lastBatchSequenceNumber2 != null) {
                    return false;
                }
            } else if (!lastBatchSequenceNumber.equals(lastBatchSequenceNumber2)) {
                return false;
            }
            String shardIterator = shardIterator();
            String shardIterator2 = prefetchRecordsRetrieved.shardIterator();
            if (shardIterator == null) {
                if (shardIterator2 != null) {
                    return false;
                }
            } else if (!shardIterator.equals(shardIterator2)) {
                return false;
            }
            BatchUniqueIdentifier batchUniqueIdentifier = batchUniqueIdentifier();
            BatchUniqueIdentifier batchUniqueIdentifier2 = prefetchRecordsRetrieved.batchUniqueIdentifier();
            return batchUniqueIdentifier == null ? batchUniqueIdentifier2 == null : batchUniqueIdentifier.equals(batchUniqueIdentifier2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof PrefetchRecordsRetrieved;
        }

        public int hashCode() {
            ProcessRecordsInput processRecordsInput = processRecordsInput();
            int hashCode = (1 * 59) + (processRecordsInput == null ? 43 : processRecordsInput.hashCode());
            String lastBatchSequenceNumber = lastBatchSequenceNumber();
            int hashCode2 = (hashCode * 59) + (lastBatchSequenceNumber == null ? 43 : lastBatchSequenceNumber.hashCode());
            String shardIterator = shardIterator();
            int hashCode3 = (hashCode2 * 59) + (shardIterator == null ? 43 : shardIterator.hashCode());
            BatchUniqueIdentifier batchUniqueIdentifier = batchUniqueIdentifier();
            return (hashCode3 * 59) + (batchUniqueIdentifier == null ? 43 : batchUniqueIdentifier.hashCode());
        }

        public String toString() {
            return "PrefetchRecordsPublisher.PrefetchRecordsRetrieved(processRecordsInput=" + processRecordsInput() + ", lastBatchSequenceNumber=" + lastBatchSequenceNumber() + ", shardIterator=" + shardIterator() + ", batchUniqueIdentifier=" + batchUniqueIdentifier() + ")";
        }
    }

    public PrefetchRecordsPublisher(int i, int i2, int i3, int i4, @NonNull GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, @NonNull ExecutorService executorService, long j, @NonNull MetricsFactory metricsFactory, @NonNull String str, @NonNull String str2) {
        if (getRecordsRetrievalStrategy == null) {
            throw new NullPointerException("getRecordsRetrievalStrategy");
        }
        if (executorService == null) {
            throw new NullPointerException("executorService");
        }
        if (metricsFactory == null) {
            throw new NullPointerException("metricsFactory");
        }
        if (str == null) {
            throw new NullPointerException("operation");
        }
        if (str2 == null) {
            throw new NullPointerException("shardId");
        }
        this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
        this.maxRecordsPerCall = i4;
        this.maxPendingProcessRecordsInput = i;
        this.maxByteSize = i2;
        this.maxRecordsCount = i3;
        this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput);
        this.prefetchCounters = new PrefetchCounters();
        this.executorService = executorService;
        this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory);
        this.idleMillisBetweenCalls = j;
        this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon();
        Validate.notEmpty(str, "Operation cannot be empty", new Object[0]);
        this.operation = str;
        this.dataFetcher = this.getRecordsRetrievalStrategy.getDataFetcher();
        this.shardId = str2;
    }

    @Override // software.amazon.kinesis.retrieval.RecordsPublisher
    public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPositionInStreamExtended initialPositionInStreamExtended) {
        if (this.executorService.isShutdown()) {
            throw new IllegalStateException("ExecutorService has been shutdown.");
        }
        this.initialPositionInStreamExtended = initialPositionInStreamExtended;
        this.highestSequenceNumber = extendedSequenceNumber.sequenceNumber();
        this.dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended);
        if (!this.started) {
            log.info("{} : Starting prefetching thread.", this.shardId);
            this.executorService.execute(this.defaultGetRecordsCacheDaemon);
        }
        this.started = true;
    }

    private void throwOnIllegalState() {
        if (this.executorService.isShutdown()) {
            throw new IllegalStateException("Shutdown has been called on the cache, can't accept new requests.");
        }
        if (!this.started) {
            throw new IllegalStateException("Cache has not been initialized, make sure to call start.");
        }
    }

    private RecordsRetrieved peekNextResult() {
        throwOnIllegalState();
        PrefetchRecordsRetrieved peek = this.getRecordsResultQueue.peek();
        return peek == null ? peek : peek.prepareForPublish();
    }

    @VisibleForTesting
    RecordsRetrieved pollNextResultAndUpdatePrefetchCounters() {
        throwOnIllegalState();
        PrefetchRecordsRetrieved poll = this.getRecordsResultQueue.poll();
        if (poll != null) {
            this.prefetchCounters.removed(poll.processRecordsInput);
            this.requestedResponses.decrementAndGet();
        } else {
            log.info("{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch bufferwas reset.", this.shardId);
        }
        return poll;
    }

    @Override // software.amazon.kinesis.retrieval.RecordsPublisher
    public void shutdown() {
        this.defaultGetRecordsCacheDaemon.isShutdown = true;
        this.executorService.shutdownNow();
        this.started = false;
    }

    @Override // software.amazon.kinesis.retrieval.RecordsPublisher
    public void restartFrom(RecordsRetrieved recordsRetrieved) {
        if (!(recordsRetrieved instanceof PrefetchRecordsRetrieved)) {
            throw new IllegalArgumentException("Provided RecordsRetrieved was not produced by the PrefetchRecordsPublisher");
        }
        PrefetchRecordsRetrieved prefetchRecordsRetrieved = (PrefetchRecordsRetrieved) recordsRetrieved;
        this.resetLock.writeLock().lock();
        try {
            this.getRecordsResultQueue.clear();
            log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", this.shardId, Integer.valueOf(this.getRecordsResultQueue.size()), Long.valueOf(this.requestedResponses.get()));
            this.shouldDrainEventOnlyOnAck.set(false);
            this.prefetchCounters.reset();
            this.highestSequenceNumber = prefetchRecordsRetrieved.lastBatchSequenceNumber();
            this.dataFetcher.resetIterator(prefetchRecordsRetrieved.shardIterator(), this.highestSequenceNumber, this.initialPositionInStreamExtended);
            this.wasReset = true;
        } finally {
            this.resetLock.writeLock().unlock();
        }
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super RecordsRetrieved> subscriber) {
        this.subscriber = subscriber;
        this.subscriber.onSubscribe(new Subscription() { // from class: software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.1
            @Override // org.reactivestreams.Subscription
            public void request(long j) {
                PrefetchRecordsPublisher.this.requestedResponses.addAndGet(j);
                PrefetchRecordsPublisher.this.drainQueueForRequestsIfAllowed();
            }

            @Override // org.reactivestreams.Subscription
            public void cancel() {
                PrefetchRecordsPublisher.this.requestedResponses.set(0L);
            }
        });
    }

    @Override // software.amazon.kinesis.retrieval.RecordsPublisher
    public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) {
        RecordsRetrieved peekNextResult = peekNextResult();
        if (peekNextResult == null || !peekNextResult.batchUniqueIdentifier().equals(recordsDeliveryAck.batchUniqueIdentifier())) {
            log.info("{} :  Received a stale notification with id {} instead of expected id {} at {}. Will ignore.", this.shardId, recordsDeliveryAck.batchUniqueIdentifier(), peekNextResult == null ? null : peekNextResult.batchUniqueIdentifier(), Instant.now());
        } else {
            pollNextResultAndUpdatePrefetchCounters();
            if (this.getRecordsResultQueue.isEmpty()) {
                log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", this.shardId, Integer.valueOf(this.getRecordsResultQueue.size()), Long.valueOf(this.requestedResponses.get()));
                this.shouldDrainEventOnlyOnAck.set(false);
            } else {
                drainQueueForRequests();
            }
        }
        DiagnosticUtils.takeDelayedDeliveryActionIfRequired(this.shardId, this.lastEventDeliveryTime, log);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addArrivedRecordsInput(PrefetchRecordsRetrieved prefetchRecordsRetrieved) throws InterruptedException {
        this.wasReset = false;
        while (!this.getRecordsResultQueue.offer(prefetchRecordsRetrieved, this.idleMillisBetweenCalls, TimeUnit.MILLISECONDS)) {
            this.resetLock.readLock().unlock();
            this.resetLock.readLock().lock();
            if (this.wasReset) {
                throw new PositionResetException();
            }
        }
        this.prefetchCounters.added(prefetchRecordsRetrieved.processRecordsInput);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void drainQueueForRequestsIfAllowed() {
        if (this.shouldDrainEventOnlyOnAck.get()) {
            return;
        }
        drainQueueForRequests();
    }

    private synchronized void drainQueueForRequests() {
        RecordsRetrieved peekNextResult = peekNextResult();
        if (this.requestedResponses.get() <= 0 || peekNextResult == null) {
            if (this.shouldDrainEventOnlyOnAck.get()) {
                log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", this.shardId, Integer.valueOf(this.getRecordsResultQueue.size()), Long.valueOf(this.requestedResponses.get()));
                this.shouldDrainEventOnlyOnAck.set(false);
                return;
            }
            return;
        }
        this.lastEventDeliveryTime = Instant.now();
        this.subscriber.onNext(peekNextResult);
        if (this.shouldDrainEventOnlyOnAck.get()) {
            return;
        }
        log.debug("{} : Notifier thread takes over the draining control. Queue Size : {}, Demand : {}", this.shardId, Integer.valueOf(this.getRecordsResultQueue.size()), Long.valueOf(this.requestedResponses.get()));
        this.shouldDrainEventOnlyOnAck.set(true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String calculateHighestSequenceNumber(ProcessRecordsInput processRecordsInput) {
        String str = this.highestSequenceNumber;
        if (processRecordsInput.records() != null && !processRecordsInput.records().isEmpty()) {
            str = processRecordsInput.records().get(processRecordsInput.records().size() - 1).sequenceNumber();
        }
        return str;
    }
}
