package co.cask.cdap.data.stream;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.common.io.BinaryDecoder;
import co.cask.cdap.common.io.ByteBuffers;
import co.cask.cdap.common.io.Decoder;
import co.cask.cdap.common.io.SeekableInputStream;
import co.cask.cdap.common.stream.StreamEventDataCodec;
import co.cask.cdap.data.file.FileReader;
import co.cask.cdap.data.file.ReadFilter;
import co.cask.cdap.data.stream.StreamDataFileConstants;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.InputSupplier;
import com.google.gson.JsonSyntaxException;
import com.google.gson.stream.JsonReader;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.NotThreadSafe;

@NotThreadSafe
/* loaded from: input_file:co/cask/cdap/data/stream/StreamDataFileReader.class */
public final class StreamDataFileReader implements FileReader<PositionStreamEvent, Long> {
    private final InputSupplier<? extends SeekableInputStream> eventInputSupplier;
    private final InputSupplier<? extends InputStream> indexInputSupplier;
    private final long startTime;
    private final long offset;
    private StreamDataFileIndex index;
    private SeekableInputStream eventInput;
    private long position;
    private boolean closed;
    private boolean eof;
    private Decoder decoder;
    private StreamEvent eventTemplate;
    private final StreamEventBuffer streamEventBuffer = new StreamEventBuffer();
    private final byte[] timestampBuffer = new byte[8];
    private long timestamp = -1;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data/stream/StreamDataFileReader$SkipCondition.class */
    public interface SkipCondition {
        boolean apply(long j, long j2);
    }

    public static StreamDataFileReader create(InputSupplier<? extends SeekableInputStream> inputSupplier) {
        return new StreamDataFileReader(inputSupplier, null, 0L, 0L);
    }

    public static StreamDataFileReader createByStartTime(InputSupplier<? extends SeekableInputStream> inputSupplier, InputSupplier<? extends InputStream> inputSupplier2, long j) {
        return new StreamDataFileReader(inputSupplier, inputSupplier2, j, 0L);
    }

    public static StreamDataFileReader createWithOffset(InputSupplier<? extends SeekableInputStream> inputSupplier, InputSupplier<? extends InputStream> inputSupplier2, long j) {
        return new StreamDataFileReader(inputSupplier, inputSupplier2, 0L, j);
    }

    private StreamDataFileReader(InputSupplier<? extends SeekableInputStream> inputSupplier, InputSupplier<? extends InputStream> inputSupplier2, long j, long j2) {
        this.eventInputSupplier = inputSupplier;
        this.indexInputSupplier = inputSupplier2;
        this.startTime = j;
        this.offset = j2;
    }

    @Override // co.cask.cdap.data.file.PositionReporter
    public Long getPosition() {
        return Long.valueOf(this.position);
    }

    @Override // co.cask.cdap.data.file.FileReader
    public void initialize() throws IOException {
        try {
            if (this.eventInput == null) {
                doOpen();
            }
        } catch (IOException e) {
            if (!(e instanceof EOFException) && !(e instanceof FileNotFoundException)) {
                throw e;
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        try {
            if (this.eventInput != null) {
                this.eventInput.close();
            }
        } finally {
            this.closed = true;
        }
    }

    @Override // co.cask.cdap.data.file.FileReader
    public int read(Collection<? super PositionStreamEvent> collection, int i, long j, TimeUnit timeUnit) throws IOException, InterruptedException {
        return read(collection, i, j, timeUnit, ReadFilter.ALWAYS_ACCEPT);
    }

    @Override // co.cask.cdap.data.file.FileReader
    public int read(Collection<? super PositionStreamEvent> collection, int i, long j, TimeUnit timeUnit, ReadFilter readFilter) throws IOException, InterruptedException {
        if (this.closed) {
            throw new IOException("Reader already closed.");
        }
        int i2 = 0;
        long computeSleepNano = computeSleepNano(j, timeUnit);
        try {
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.start();
            while (!this.eof && i2 < i) {
                try {
                    if (this.eventInput == null) {
                        doOpen();
                    }
                    PositionStreamEvent nextStreamEvent = nextStreamEvent(readFilter);
                    if (nextStreamEvent == null) {
                        if (this.eof) {
                            break;
                        }
                    } else {
                        collection.add(nextStreamEvent);
                        i2++;
                    }
                } catch (IOException e) {
                    if (this.eventInput != null) {
                        this.eventInput.close();
                        this.eventInput = null;
                    }
                    if (!(e instanceof EOFException) && !(e instanceof FileNotFoundException)) {
                        throw e;
                    }
                    if (this.eof || j <= 0) {
                        break;
                    }
                    if (stopwatch.elapsedTime(timeUnit) >= j) {
                        break;
                    }
                    TimeUnit.NANOSECONDS.sleep(computeSleepNano);
                    if (stopwatch.elapsedTime(timeUnit) >= j) {
                        break;
                    }
                }
            }
            if (i2 == 0 && this.eof) {
                return -1;
            }
            return i2;
        } catch (IOException e2) {
            close();
            throw e2;
        }
    }

    private StreamDataFileIndex getIndex() {
        if (this.index == null && this.indexInputSupplier != null) {
            this.index = new StreamDataFileIndex(this.indexInputSupplier);
        }
        return this.index;
    }

    private void doOpen() throws IOException {
        try {
            this.eventInput = (SeekableInputStream) this.eventInputSupplier.getInput();
            this.decoder = new BinaryDecoder(this.eventInput);
            if (this.position <= 0) {
                init();
            } else if (this.streamEventBuffer.hasEvent()) {
                this.eventInput.seek(this.streamEventBuffer.getEndPosition());
            } else {
                this.eventInput.seek(this.position);
            }
        } catch (IOException e) {
            if (this.eventInput != null) {
                this.eventInput.close();
                this.eventInput = null;
            }
            throw e;
        }
    }

    private long computeSleepNano(long j, TimeUnit timeUnit) {
        long convert = TimeUnit.NANOSECONDS.convert(j, timeUnit) / 10;
        if (convert <= 0) {
            return 1L;
        }
        return convert;
    }

    private void init() throws IOException {
        readHeader();
        if (this.offset > 0) {
            initByOffset(this.offset);
        } else if (this.startTime > 0) {
            initByTime(this.startTime);
        }
    }

    private void readHeader() throws IOException {
        byte[] bArr = new byte[2];
        ByteStreams.readFully(this.eventInput, bArr);
        int decodeFileVersion = decodeFileVersion(bArr);
        Map<String, String> decodeMap = StreamUtils.decodeMap(new BinaryDecoder(this.eventInput));
        verifySchema(decodeMap);
        if (decodeFileVersion >= 2) {
            this.eventTemplate = createEventTemplate(decodeMap);
        } else {
            this.eventTemplate = new StreamEvent(ImmutableMap.of(), ByteBuffers.EMPTY_BUFFER, -1L);
        }
        this.position = this.eventInput.getPos();
    }

    private int decodeFileVersion(byte[] bArr) throws IOException {
        if (Arrays.equals(bArr, StreamDataFileConstants.MAGIC_HEADER_V1)) {
            return 1;
        }
        if (Arrays.equals(bArr, StreamDataFileConstants.MAGIC_HEADER_V2)) {
            return 2;
        }
        throw new IOException(String.format("Unsupported stream file format. First two bytes must be %s or %s", Bytes.toStringBinary(StreamDataFileConstants.MAGIC_HEADER_V1), Bytes.toStringBinary(StreamDataFileConstants.MAGIC_HEADER_V2)));
    }

    private StreamEvent createEventTemplate(Map<String, String> map) throws IOException {
        long j = -1;
        String str = map.get(StreamDataFileConstants.Property.Key.UNI_TIMESTAMP);
        if (StreamDataFileConstants.Property.Value.CLOSE_TIMESTAMP.equals(str)) {
            long pos = this.eventInput.getPos();
            this.eventInput.seek(this.eventInput.size() - 8);
            j = Math.abs(readTimestamp());
            this.eventInput.seek(pos);
        } else if (str != null) {
            j = Long.parseLong(str);
        }
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            if (entry.getKey().startsWith(StreamDataFileConstants.Property.Key.EVENT_HEADER_PREFIX)) {
                builder.put(entry.getKey().substring(StreamDataFileConstants.Property.Key.EVENT_HEADER_PREFIX.length()), entry.getValue());
            }
        }
        return new StreamEvent(builder.build(), ByteBuffers.EMPTY_BUFFER, j);
    }

    private void initByOffset(final long j) throws IOException {
        StreamDataFileIndex index = getIndex();
        long floorPosition = index == null ? 0L : index.floorPosition(j);
        if (floorPosition > 0) {
            this.eventInput.seek(floorPosition);
        }
        skipUntil(new SkipCondition() { // from class: co.cask.cdap.data.stream.StreamDataFileReader.1
            @Override // co.cask.cdap.data.stream.StreamDataFileReader.SkipCondition
            public boolean apply(long j2, long j3) {
                return j2 >= j;
            }
        });
    }

    private void initByTime(final long j) throws IOException {
        StreamDataFileIndex index = getIndex();
        long floorPositionByTime = index == null ? 0L : index.floorPositionByTime(j);
        if (floorPositionByTime > 0) {
            this.eventInput.seek(floorPositionByTime);
        }
        skipUntil(new SkipCondition() { // from class: co.cask.cdap.data.stream.StreamDataFileReader.2
            @Override // co.cask.cdap.data.stream.StreamDataFileReader.SkipCondition
            public boolean apply(long j2, long j3) {
                return j3 >= j;
            }
        });
    }

    /*  JADX ERROR: Failed to decode insn: 0x0008: MOVE_MULTI, method: co.cask.cdap.data.stream.StreamDataFileReader.skipUntil(co.cask.cdap.data.stream.StreamDataFileReader$SkipCondition):void
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    private void skipUntil(co.cask.cdap.data.stream.StreamDataFileReader.SkipCondition r7) throws java.io.IOException {
        /*
            r6 = this;
            r0 = r6
            r1 = r6
            co.cask.cdap.common.io.SeekableInputStream r1 = r1.eventInput
            long r1 = r1.getPos()
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.position = r1
            r8 = r-1
            r-1 = r6
            boolean r-1 = r-1.eof
            if (r-1 != 0) goto L77
            r-1 = r6
            r-1.readTimestamp()
            r10 = r-1
            r-1 = r6
            r0 = r10
            r1 = 0
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 >= 0) goto L26
            r0 = 1
            goto L27
            r0 = 0
            r-1.eof = r0
            r-1 = r6
            boolean r-1 = r-1.eof
            if (r-1 != 0) goto L77
            r-1 = r7
            r0 = r8
            r1 = r10
            r-1.apply(r0, r1)
            if (r-1 == 0) goto L40
            goto L77
            r-1 = r6
            r-1.readLength()
            r12 = r-1
            r-1 = r6
            r0 = r8
            r-1.position = r0
            r-1 = r6
            co.cask.cdap.common.io.SeekableInputStream r-1 = r-1.eventInput
            r0 = r6
            co.cask.cdap.common.io.SeekableInputStream r0 = r0.eventInput
            long r0 = r0.getPos()
            r1 = r12
            long r1 = (long) r1
            long r0 = r0 + r1
            r-1.seek(r0)
            r-1 = r6
            co.cask.cdap.common.io.SeekableInputStream r-1 = r-1.eventInput
            r-1.getPos()
            r8 = r-1
            r-1 = r7
            r0 = r8
            r1 = r10
            r-1.apply(r0, r1)
            if (r-1 == 0) goto L74
            goto L77
            goto Ld
            r-1 = r6
            boolean r-1 = r-1.eof
            if (r-1 == 0) goto L84
            r-1 = r6
            r0 = r8
            r-1.position = r0
            return
            r-1 = r6
            co.cask.cdap.common.io.SeekableInputStream r-1 = r-1.eventInput
            r0 = r6
            long r0 = r0.position
            r-1.seek(r0)
            r-1 = r6
            co.cask.cdap.data.file.ReadFilter r0 = co.cask.cdap.data.file.ReadFilter.ALWAYS_ACCEPT
            r-1.readDataBlock(r0)
            r-1 = r6
            long r-1 = r-1.position
            r0 = r8
            int r-1 = (r-1 > r0 ? 1 : (r-1 == r0 ? 0 : -1))
            if (r-1 >= 0) goto Lc1
            r-1 = r7
            r0 = r6
            co.cask.cdap.data.stream.StreamEventBuffer r0 = r0.streamEventBuffer
            long r0 = r0.getPosition()
            r1 = r6
            long r1 = r1.timestamp
            r-1.apply(r0, r1)
            if (r-1 == 0) goto Lb6
            goto Lc1
            r-1 = r6
            co.cask.cdap.data.file.ReadFilter r0 = co.cask.cdap.data.file.ReadFilter.ALWAYS_REJECT_OFFSET
            r-1.nextStreamEvent(r0)
            goto L96
            goto Ld1
            r10 = move-exception
            r0 = r10
            boolean r0 = r0 instanceof java.io.EOFException
            if (r0 != 0) goto Ld1
            r0 = r10
            throw r0
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: co.cask.cdap.data.stream.StreamDataFileReader.skipUntil(co.cask.cdap.data.stream.StreamDataFileReader$SkipCondition):void");
    }

    private void verifySchema(Map<String, String> map) throws IOException {
        String str = map.get(StreamDataFileConstants.Property.Key.SCHEMA);
        if (str == null) {
            throw new IOException("Missing '" + StreamDataFileConstants.Property.Key.SCHEMA + "' property.");
        }
        try {
            if (StreamEventDataCodec.STREAM_DATA_SCHEMA.equals(new SchemaTypeAdapter().read(new JsonReader(new StringReader(str))))) {
            } else {
                throw new IOException("Unsupported schema " + str);
            }
        } catch (JsonSyntaxException e) {
            throw new IOException("Invalid schema.", e);
        }
    }

    private long readTimestamp() throws IOException {
        ByteStreams.readFully(this.eventInput, this.timestampBuffer);
        return Bytes.toLong(this.timestampBuffer);
    }

    private int readLength() throws IOException {
        return this.decoder.readInt();
    }

    private void readDataBlock(ReadFilter readFilter) throws IOException {
        this.position = this.eventInput.getPos();
        long readTimestamp = readTimestamp();
        if (readTimestamp < 0) {
            this.eof = true;
            return;
        }
        long timestamp = this.eventTemplate.getTimestamp() >= 0 ? this.eventTemplate.getTimestamp() : readTimestamp;
        if (acceptTimestamp(readFilter, timestamp)) {
            this.streamEventBuffer.fillBuffer(this.eventInput, readLength());
            this.timestamp = timestamp;
            return;
        }
        if (this.eventTemplate.getTimestamp() >= 0) {
            this.eof = true;
            return;
        }
        long nextTimestampHint = readFilter.getNextTimestampHint();
        if (nextTimestampHint > timestamp) {
            this.eventInput.seek(this.position);
            initByTime(nextTimestampHint);
            return;
        }
        int readLength = readLength();
        long skip = this.eventInput.skip(readLength);
        if (skip != readLength) {
            throw new EOFException("Expected to skip " + readLength + " but only " + skip + " was skipped.");
        }
        this.position = this.eventInput.getPos();
    }

    private PositionStreamEvent nextStreamEvent(ReadFilter readFilter) throws IOException {
        while (!this.eof && (!this.streamEventBuffer.hasEvent() || !acceptTimestamp(readFilter, this.timestamp))) {
            readDataBlock(readFilter);
        }
        if (this.eof) {
            return null;
        }
        PositionStreamEvent nextEvent = this.streamEventBuffer.nextEvent(this.timestamp, this.eventTemplate.getHeaders(), readFilter);
        this.position = this.streamEventBuffer.getPosition();
        return nextEvent;
    }

    private boolean acceptTimestamp(ReadFilter readFilter, long j) {
        readFilter.reset();
        return readFilter.acceptTimestamp(j);
    }
}
