package co.cask.cdap.data.stream;

import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.stream.StreamEventDecoder;
import co.cask.cdap.common.io.Locations;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/* loaded from: input_file:co/cask/cdap/data/stream/StreamRecordReader.class */
final class StreamRecordReader<K, V> extends RecordReader<K, V> {
    private final StreamEventDecoder<K, V> decoder;
    private StreamDataFileReader reader;
    private StreamInputSplit inputSplit;
    private final List<StreamEvent> events = Lists.newArrayListWithCapacity(1);
    private StreamEventDecoder.DecodeResult<K, V> currentEntry = new StreamEventDecoder.DecodeResult<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamRecordReader(StreamEventDecoder<K, V> streamEventDecoder) {
        this.decoder = streamEventDecoder;
    }

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        this.inputSplit = (StreamInputSplit) inputSplit;
        this.reader = createReader(FileSystem.get(taskAttemptContext.getConfiguration()), this.inputSplit);
        this.reader.initialize();
    }

    public boolean nextKeyValue() throws IOException, InterruptedException {
        while (this.reader.getPosition().longValue() < this.inputSplit.getStart() + this.inputSplit.getLength()) {
            this.events.clear();
            if (this.reader.read(this.events, 1, 0L, TimeUnit.SECONDS) <= 0) {
                return false;
            }
            StreamEvent streamEvent = this.events.get(0);
            if (streamEvent.getTimestamp() >= this.inputSplit.getStartTime()) {
                if (streamEvent.getTimestamp() >= this.inputSplit.getEndTime()) {
                    return false;
                }
                this.currentEntry = this.decoder.decode(streamEvent, this.currentEntry);
                return true;
            }
        }
        return false;
    }

    public K getCurrentKey() throws IOException, InterruptedException {
        return (K) this.currentEntry.getKey();
    }

    public V getCurrentValue() throws IOException, InterruptedException {
        return (V) this.currentEntry.getValue();
    }

    public float getProgress() throws IOException, InterruptedException {
        if (this.reader == null) {
            return 0.0f;
        }
        return Math.min(((float) (this.reader.getPosition().longValue() - this.inputSplit.getStart())) / ((float) this.inputSplit.getLength()), 1.0f);
    }

    public void close() throws IOException {
        if (this.reader != null) {
            this.reader.close();
        }
    }

    private StreamDataFileReader createReader(FileSystem fileSystem, StreamInputSplit streamInputSplit) {
        return StreamDataFileReader.createWithOffset(Locations.newInputSupplier(fileSystem, streamInputSplit.getPath()), Locations.newInputSupplier(fileSystem, streamInputSplit.getIndexPath()), streamInputSplit.getStart());
    }
}
