package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.IOException;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisReader.class */
public class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisReader.class);
    private final KinesisIO.Read spec;
    private final SimplifiedKinesisClient kinesis;
    private final KinesisSource source;
    private final KinesisReaderCheckpoint initCheckpoint;
    private final Duration backlogBytesCheckThreshold;
    private CustomOptional<KinesisRecord> currentRecord;
    private long lastBacklogBytes;
    private Instant backlogBytesLastCheckTime;
    private ShardReadersPool shardReadersPool;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KinesisReader(KinesisIO.Read read, SimplifiedKinesisClient simplifiedKinesisClient, KinesisReaderCheckpoint kinesisReaderCheckpoint, KinesisSource kinesisSource) {
        this(read, simplifiedKinesisClient, kinesisReaderCheckpoint, kinesisSource, Duration.standardSeconds(30L));
    }

    KinesisReader(KinesisIO.Read read, SimplifiedKinesisClient simplifiedKinesisClient, KinesisReaderCheckpoint kinesisReaderCheckpoint, KinesisSource kinesisSource, Duration duration) {
        this.currentRecord = CustomOptional.absent();
        this.backlogBytesLastCheckTime = new Instant(0L);
        this.spec = (KinesisIO.Read) Preconditions.checkNotNull(read, "spec");
        this.kinesis = (SimplifiedKinesisClient) Preconditions.checkNotNull(simplifiedKinesisClient, "kinesis");
        this.initCheckpoint = (KinesisReaderCheckpoint) Preconditions.checkNotNull(kinesisReaderCheckpoint);
        this.source = kinesisSource;
        this.backlogBytesCheckThreshold = duration;
    }

    public boolean start() throws IOException {
        LOG.info("Starting reader using {}", this.initCheckpoint);
        try {
            this.shardReadersPool = createShardReadersPool();
            this.shardReadersPool.start();
            return advance();
        } catch (TransientKinesisException e) {
            throw new IOException(e);
        }
    }

    public boolean advance() throws IOException {
        this.currentRecord = this.shardReadersPool.nextRecord();
        return this.currentRecord.isPresent();
    }

    public byte[] getCurrentRecordId() throws NoSuchElementException {
        return this.currentRecord.get().getUniqueId();
    }

    /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
    public KinesisRecord m30getCurrent() throws NoSuchElementException {
        return this.currentRecord.get();
    }

    public Instant getCurrentTimestamp() throws NoSuchElementException {
        return this.currentRecord.get().getApproximateArrivalTimestamp();
    }

    public void close() throws IOException {
        try {
            SimplifiedKinesisClient simplifiedKinesisClient = this.kinesis;
            Throwable th = null;
            try {
                this.shardReadersPool.stop();
                if (simplifiedKinesisClient != null) {
                    if (0 != 0) {
                        try {
                            simplifiedKinesisClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        simplifiedKinesisClient.close();
                    }
                }
            } finally {
            }
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e2) {
            throw new IOException(e2);
        }
    }

    public Instant getWatermark() {
        return this.shardReadersPool.getWatermark();
    }

    public UnboundedSource.CheckpointMark getCheckpointMark() {
        return this.shardReadersPool.getCheckpointMark();
    }

    /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
    public UnboundedSource<KinesisRecord, ?> m29getCurrentSource() {
        return this.source;
    }

    public long getSplitBacklogBytes() {
        if (this.shardReadersPool == null) {
            return -1L;
        }
        Instant latestRecordTimestamp = this.shardReadersPool.getLatestRecordTimestamp();
        if (latestRecordTimestamp.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
            LOG.debug("Split backlog bytes for stream {} unknown", this.spec.getStreamName());
            return -1L;
        }
        if (latestRecordTimestamp.plus(this.spec.getUpToDateThreshold()).isAfterNow()) {
            LOG.debug("Split backlog bytes for stream {} with latest record timestamp {}: 0 (latest record timestamp is up-to-date with threshold of {})", new Object[]{this.spec.getStreamName(), latestRecordTimestamp, this.spec.getUpToDateThreshold()});
            return 0L;
        }
        if (this.backlogBytesLastCheckTime.plus(this.backlogBytesCheckThreshold).isAfterNow()) {
            LOG.debug("Split backlog bytes for {} stream with latest record timestamp {}: {} (cached value)", new Object[]{this.spec.getStreamName(), latestRecordTimestamp, Long.valueOf(this.lastBacklogBytes)});
            return this.lastBacklogBytes;
        }
        try {
            this.lastBacklogBytes = this.kinesis.getBacklogBytes(this.spec.getStreamName(), latestRecordTimestamp);
            this.backlogBytesLastCheckTime = Instant.now();
        } catch (TransientKinesisException e) {
            LOG.warn("Transient exception occurred during backlog estimation for stream {}.", this.spec.getStreamName(), e);
        }
        LOG.info("Split backlog bytes for {} stream with {} latest record timestamp: {}", new Object[]{this.spec.getStreamName(), latestRecordTimestamp, Long.valueOf(this.lastBacklogBytes)});
        return this.lastBacklogBytes;
    }

    ShardReadersPool createShardReadersPool() throws TransientKinesisException {
        return new ShardReadersPool(this.spec, this.kinesis, this.initCheckpoint);
    }
}
