package org.apache.beam.sdk.io.kinesis;

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.Shard;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.class */
public class ShardRecordsIterator {
    private static final Logger LOG = LoggerFactory.getLogger(ShardRecordsIterator.class);
    private final SimplifiedKinesisClient kinesis;
    private final RecordFilter filter;
    private final String streamName;
    private final String shardId;
    private AtomicReference<ShardCheckpoint> checkpoint;
    private String shardIterator;
    private AtomicLong millisBehindLatest;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShardRecordsIterator(ShardCheckpoint shardCheckpoint, SimplifiedKinesisClient simplifiedKinesisClient) throws TransientKinesisException {
        this(shardCheckpoint, simplifiedKinesisClient, new RecordFilter());
    }

    ShardRecordsIterator(ShardCheckpoint shardCheckpoint, SimplifiedKinesisClient simplifiedKinesisClient, RecordFilter recordFilter) throws TransientKinesisException {
        this.millisBehindLatest = new AtomicLong(Long.MAX_VALUE);
        this.checkpoint = new AtomicReference<>((ShardCheckpoint) Preconditions.checkNotNull(shardCheckpoint, "initialCheckpoint"));
        this.filter = (RecordFilter) Preconditions.checkNotNull(recordFilter, "filter");
        this.kinesis = (SimplifiedKinesisClient) Preconditions.checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient");
        this.streamName = shardCheckpoint.getStreamName();
        this.shardId = shardCheckpoint.getShardId();
        this.shardIterator = shardCheckpoint.getShardIterator(this.kinesis);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<KinesisRecord> readNextBatch() throws TransientKinesisException, KinesisShardClosedException {
        if (this.shardIterator == null) {
            throw new KinesisShardClosedException(String.format("Shard iterator reached end of the shard: streamName=%s, shardId=%s", this.streamName, this.shardId));
        }
        GetKinesisRecordsResult fetchRecords = fetchRecords();
        LOG.debug("Fetched {} new records", Integer.valueOf(fetchRecords.getRecords().size()));
        List<KinesisRecord> apply = this.filter.apply(fetchRecords.getRecords(), this.checkpoint.get());
        this.millisBehindLatest.set(fetchRecords.getMillisBehindLatest());
        return apply;
    }

    private GetKinesisRecordsResult fetchRecords() throws TransientKinesisException {
        try {
            GetKinesisRecordsResult records = this.kinesis.getRecords(this.shardIterator, this.streamName, this.shardId);
            this.shardIterator = records.getNextShardIterator();
            return records;
        } catch (ExpiredIteratorException e) {
            LOG.info("Refreshing expired iterator", e);
            this.shardIterator = this.checkpoint.get().getShardIterator(this.kinesis);
            return fetchRecords();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShardCheckpoint getCheckpoint() {
        return this.checkpoint.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isUpToDate() {
        return this.millisBehindLatest.get() == 0;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void ackRecord(KinesisRecord kinesisRecord) {
        this.checkpoint.set(this.checkpoint.get().moveAfter(kinesisRecord));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getShardId() {
        return this.shardId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<ShardRecordsIterator> findSuccessiveShardRecordIterators() throws TransientKinesisException {
        List<Shard> listShards = this.kinesis.listShards(this.streamName);
        ArrayList arrayList = new ArrayList();
        for (Shard shard : listShards) {
            if (this.shardId.equals(shard.getParentShardId())) {
                arrayList.add(new ShardRecordsIterator(new ShardCheckpoint(this.streamName, shard.getShardId(), new StartingPoint(InitialPositionInStream.TRIM_HORIZON)), this.kinesis));
            }
        }
        return arrayList;
    }
}
