package org.apache.flink.connector.kinesis.source.reader;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.kinesis.sink.shaded.software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import org.apache.flink.connector.kinesis.sink.shaded.software.amazon.awssdk.services.kinesis.model.Record;
import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
import org.apache.flink.connector.kinesis.source.split.StartingPosition;

@Internal
/* loaded from: input_file:org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.class */
public class PollingKinesisShardSplitReader implements SplitReader<Record, KinesisShardSplit> {
    private static final RecordsWithSplitIds<Record> INCOMPLETE_SHARD_EMPTY_RECORDS = new KinesisRecordsWithSplitIds(Collections.emptyIterator(), null, false);
    private final StreamProxy kinesis;
    private final Deque<KinesisShardSplitState> assignedSplits = new ArrayDeque();

    /* loaded from: input_file:org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader$KinesisRecordsWithSplitIds.class */
    private static class KinesisRecordsWithSplitIds implements RecordsWithSplitIds<Record> {
        private final Iterator<Record> recordsIterator;
        private final String splitId;
        private final boolean isComplete;

        public KinesisRecordsWithSplitIds(Iterator<Record> it, String str, boolean z) {
            this.recordsIterator = it;
            this.splitId = str;
            this.isComplete = z;
        }

        @Nullable
        public String nextSplit() {
            if (this.recordsIterator.hasNext()) {
                return this.splitId;
            }
            return null;
        }

        @Nullable
        /* renamed from: nextRecordFromSplit, reason: merged with bridge method [inline-methods] */
        public Record m1681nextRecordFromSplit() {
            if (this.recordsIterator.hasNext()) {
                return this.recordsIterator.next();
            }
            return null;
        }

        public Set<String> finishedSplits() {
            if (this.splitId != null && !this.recordsIterator.hasNext() && this.isComplete) {
                return Collections.singleton(this.splitId);
            }
            return Collections.emptySet();
        }
    }

    public PollingKinesisShardSplitReader(StreamProxy streamProxy) {
        this.kinesis = streamProxy;
    }

    public RecordsWithSplitIds<Record> fetch() throws IOException {
        KinesisShardSplitState poll = this.assignedSplits.poll();
        if (poll == null) {
            return INCOMPLETE_SHARD_EMPTY_RECORDS;
        }
        GetRecordsResponse records = this.kinesis.getRecords(poll.getStreamArn(), poll.getShardId(), poll.getNextStartingPosition());
        boolean z = records.nextShardIterator() == null;
        if (!hasNoRecords(records)) {
            poll.setNextStartingPosition(StartingPosition.continueFromSequenceNumber(records.records().get(records.records().size() - 1).sequenceNumber()));
            this.assignedSplits.add(poll);
            return new KinesisRecordsWithSplitIds(records.records().iterator(), poll.getSplitId(), z);
        }
        if (z) {
            return new KinesisRecordsWithSplitIds(Collections.emptyIterator(), poll.getSplitId(), true);
        }
        this.assignedSplits.add(poll);
        return INCOMPLETE_SHARD_EMPTY_RECORDS;
    }

    private boolean hasNoRecords(GetRecordsResponse getRecordsResponse) {
        return !getRecordsResponse.hasRecords() || getRecordsResponse.records().isEmpty();
    }

    public void handleSplitsChanges(SplitsChange<KinesisShardSplit> splitsChange) {
        Iterator it = splitsChange.splits().iterator();
        while (it.hasNext()) {
            this.assignedSplits.add(new KinesisShardSplitState((KinesisShardSplit) it.next()));
        }
    }

    public void wakeUp() {
    }

    public void close() throws Exception {
        this.kinesis.close();
    }
}
