package co.cask.cdap.data.stream;

import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.common.io.Locations;
import co.cask.cdap.data.file.FileReader;
import co.cask.cdap.data.file.LiveFileReader;
import co.cask.cdap.data.file.ReadFilter;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.tephra.TxConstants;
import org.apache.twill.filesystem.Location;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data/stream/LiveStreamFileReader.class */
public final class LiveStreamFileReader extends LiveFileReader<PositionStreamEvent, StreamFileOffset> {
    private static final Logger LOG = LoggerFactory.getLogger(LiveStreamFileReader.class);
    private final StreamFileOffset beginOffset;
    private final StreamConfig streamConfig;
    private final long maxFileCheckInterval;
    private StreamPositionTransformFileReader reader;
    private int retries;
    private long nextCheckTime;

    /* JADX INFO: Access modifiers changed from: private */
    @NotThreadSafe
    /* loaded from: input_file:co/cask/cdap/data/stream/LiveStreamFileReader$StreamPositionTransformFileReader.class */
    public static final class StreamPositionTransformFileReader implements FileReader<PositionStreamEvent, StreamFileOffset> {
        private final FileReader<PositionStreamEvent, Long> reader;
        private final Location partitionLocation;
        private StreamFileOffset offset;

        private StreamPositionTransformFileReader(StreamFileOffset streamFileOffset) throws IOException {
            this.reader = StreamDataFileReader.createWithOffset(Locations.newInputSupplier(streamFileOffset.getEventLocation()), Locations.newInputSupplier(streamFileOffset.getIndexLocation()), streamFileOffset.getOffset());
            this.offset = new StreamFileOffset(streamFileOffset);
            this.partitionLocation = Locations.getParent(streamFileOffset.getEventLocation());
            LiveStreamFileReader.LOG.trace("Stream reader created for {}", streamFileOffset.getEventLocation());
        }

        @Override // co.cask.cdap.data.file.FileReader
        public void initialize() throws IOException {
            LiveStreamFileReader.LOG.trace("Initialize stream reader {}", this.offset);
            this.reader.initialize();
            this.offset = new StreamFileOffset(this.offset, this.reader.getPosition().longValue());
            LiveStreamFileReader.LOG.trace("Stream reader initialized {}", this.offset);
        }

        @Override // co.cask.cdap.data.file.FileReader
        public int read(Collection<? super PositionStreamEvent> collection, int i, long j, TimeUnit timeUnit) throws IOException, InterruptedException {
            return read(collection, i, j, timeUnit, ReadFilter.ALWAYS_ACCEPT);
        }

        @Override // co.cask.cdap.data.file.FileReader
        public int read(Collection<? super PositionStreamEvent> collection, int i, long j, TimeUnit timeUnit, ReadFilter readFilter) throws IOException, InterruptedException {
            int read = this.reader.read(collection, i, j, timeUnit, readFilter);
            this.offset = new StreamFileOffset(this.offset, this.reader.getPosition().longValue());
            return read;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.reader.close();
        }

        @Override // co.cask.cdap.data.file.PositionReporter
        public StreamFileOffset getPosition() {
            return this.offset;
        }

        Location getPartitionLocation() {
            return this.partitionLocation;
        }
    }

    public LiveStreamFileReader(StreamConfig streamConfig, StreamFileOffset streamFileOffset) {
        this(streamConfig, streamFileOffset, Constants.Stream.NEW_FILE_CHECK_INTERVAL);
    }

    public LiveStreamFileReader(StreamConfig streamConfig, StreamFileOffset streamFileOffset, long j) {
        this.nextCheckTime = 0L;
        this.streamConfig = streamConfig;
        this.beginOffset = streamFileOffset;
        this.maxFileCheckInterval = j <= 0 ? Constants.Stream.NEW_FILE_CHECK_INTERVAL : j;
    }

    @Override // co.cask.cdap.data.file.LiveFileReader
    @Nullable
    protected FileReader<PositionStreamEvent, StreamFileOffset> renewReader() throws IOException {
        if (this.reader == null) {
            this.reader = new StreamPositionTransformFileReader(this.beginOffset);
            this.reader.initialize();
            return this.reader;
        }
        StreamFileOffset position = this.reader.getPosition();
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis < this.nextCheckTime && currentTimeMillis < position.getPartitionEnd()) {
            return null;
        }
        StreamPositionTransformFileReader createReader = createReader(StreamUtils.createStreamLocation(this.reader.getPartitionLocation(), position.getNamePrefix(), position.getSequenceId() + 1, StreamFileType.EVENT), true, position.getGeneration());
        if (createReader == null && currentTimeMillis > position.getPartitionEnd()) {
            createReader = createReader(StreamUtils.createStreamLocation(StreamUtils.createPartitionLocation(Locations.getParent(this.reader.getPartitionLocation()), position.getPartitionEnd(), this.streamConfig.getPartitionDuration()), position.getNamePrefix(), 0, StreamFileType.EVENT), false, position.getGeneration());
        }
        if (createReader != null) {
            this.reader = createReader;
            this.retries = 0;
            this.nextCheckTime = 0L;
        } else {
            this.nextCheckTime = currentTimeMillis + getCheckInterval();
        }
        return createReader;
    }

    private long getCheckInterval() {
        this.retries = Math.min(this.retries + 1, 32);
        return Math.min(100 * (this.retries >= 32 ? TxConstants.Manager.DEFAULT_TX_MAX_TIMEOUT : 1 << (this.retries - 1)), this.maxFileCheckInterval);
    }

    private StreamPositionTransformFileReader createReader(Location location, boolean z, int i) throws IOException {
        if (z && !location.exists()) {
            return null;
        }
        StreamPositionTransformFileReader streamPositionTransformFileReader = new StreamPositionTransformFileReader(new StreamFileOffset(location, 0L, i));
        streamPositionTransformFileReader.initialize();
        return streamPositionTransformFileReader;
    }
}
