package co.cask.cdap.data.stream;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.common.io.BinaryEncoder;
import co.cask.cdap.common.io.BufferedEncoder;
import co.cask.cdap.common.io.Encoder;
import co.cask.cdap.common.stream.StreamEventDataCodec;
import co.cask.cdap.data.file.FileWriter;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Closeables;
import com.google.common.io.OutputSupplier;
import com.google.common.primitives.Longs;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.io.OutputStream;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.hadoop.fs.Syncable;

@NotThreadSafe
/* loaded from: input_file:co/cask/cdap/data/stream/StreamDataFileWriter.class */
public final class StreamDataFileWriter implements Closeable, Flushable, FileWriter<StreamEvent> {
    private static final byte[] MAGIC_HEADER = {69, 49};
    private static final byte[] INDEX_MAGIC_HEADER = {73, 49};
    private static final int BUFFER_SIZE = 262144;
    private final OutputStream eventOutput;
    private final OutputStream indexOutput;
    private final long indexInterval;
    private final BufferedEncoder encoder;
    private final BufferedEncoder lengthEncoder;
    private long currentTimestamp;
    private long position;
    private long nextIndexTime;
    private boolean synced;
    private boolean closed;

    public StreamDataFileWriter(OutputSupplier<? extends OutputStream> outputSupplier, OutputSupplier<? extends OutputStream> outputSupplier2, long j) throws IOException {
        this.eventOutput = (OutputStream) outputSupplier.getOutput();
        try {
            this.indexOutput = (OutputStream) outputSupplier2.getOutput();
            this.indexInterval = j;
            this.currentTimestamp = -1L;
            Function<OutputStream, Encoder> createEncoderFactory = createEncoderFactory();
            this.encoder = new BufferedEncoder(BUFFER_SIZE, createEncoderFactory);
            this.lengthEncoder = new BufferedEncoder(5, createEncoderFactory);
            try {
                init();
            } catch (IOException e) {
                Closeables.closeQuietly(this.eventOutput);
                Closeables.closeQuietly(this.indexOutput);
                throw e;
            }
        } catch (IOException e2) {
            Closeables.closeQuietly(this.eventOutput);
            throw e2;
        }
    }

    @Override // co.cask.cdap.data.file.FileWriter
    public void append(StreamEvent streamEvent) throws IOException {
        if (this.closed) {
            throw new IOException("Writer already closed.");
        }
        this.synced = false;
        long timestamp = streamEvent.getTimestamp();
        if (timestamp < this.currentTimestamp) {
            throw closeWithException(new IOException("Out of order events written."));
        }
        try {
            if (timestamp > this.currentTimestamp) {
                flushBlock(false);
                this.currentTimestamp = timestamp;
                this.eventOutput.write(Bytes.toBytes(this.currentTimestamp));
                this.position += 8;
            }
            StreamEventDataCodec.encode(streamEvent, this.encoder);
            if (this.encoder.size() >= BUFFER_SIZE) {
                flushBlock(false);
            }
        } catch (IOException e) {
            throw closeWithException(e);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        try {
            flushBlock(false);
            this.eventOutput.write(Longs.toByteArray(-1L));
            this.closed = true;
            try {
                this.eventOutput.close();
                this.indexOutput.close();
            } finally {
            }
        } catch (Throwable th) {
            this.closed = true;
            try {
                this.eventOutput.close();
                this.indexOutput.close();
                throw th;
            } finally {
            }
        }
    }

    @Override // java.io.Flushable
    public void flush() throws IOException {
        try {
            flushBlock(true);
        } catch (IOException e) {
            throw closeWithException(e);
        }
    }

    private void init() throws IOException {
        this.encoder.writeRaw(MAGIC_HEADER);
        StreamUtils.encodeMap(ImmutableMap.of("stream.schema", StreamEventDataCodec.STREAM_DATA_SCHEMA.toString()), this.encoder);
        long size = this.encoder.size();
        this.encoder.writeTo(this.eventOutput);
        sync(this.eventOutput);
        this.position = size;
        this.encoder.writeRaw(INDEX_MAGIC_HEADER);
        StreamUtils.encodeMap(ImmutableMap.of(), this.encoder);
        this.encoder.writeTo(this.indexOutput);
        sync(this.indexOutput);
    }

    private void flushBlock(boolean z) throws IOException {
        if (this.encoder.size() == 0) {
            if (!z || this.synced) {
                return;
            }
            sync(this.eventOutput);
            sync(this.indexOutput);
            this.synced = true;
            return;
        }
        long j = -1;
        if (this.currentTimestamp >= this.nextIndexTime) {
            j = this.position - 8;
        }
        this.lengthEncoder.writeInt(this.encoder.size());
        int size = this.lengthEncoder.size();
        this.lengthEncoder.writeTo(this.eventOutput);
        this.position += size;
        int size2 = this.encoder.size();
        this.encoder.writeTo(this.eventOutput);
        this.position += size2;
        if (z) {
            sync(this.eventOutput);
        }
        if (j >= 0) {
            this.encoder.writeRaw(Bytes.toBytes(this.currentTimestamp));
            this.encoder.writeRaw(Bytes.toBytes(j));
            this.encoder.writeTo(this.indexOutput);
            if (z) {
                sync(this.indexOutput);
            }
            this.nextIndexTime = this.currentTimestamp + this.indexInterval;
        } else if (z) {
            sync(this.indexOutput);
        }
        this.currentTimestamp = -1L;
        this.synced = z;
    }

    private void sync(OutputStream outputStream) throws IOException {
        if (outputStream instanceof Syncable) {
            ((Syncable) outputStream).hsync();
        } else {
            outputStream.flush();
        }
    }

    private IOException closeWithException(IOException iOException) throws IOException {
        this.closed = true;
        Closeables.closeQuietly(this.eventOutput);
        Closeables.closeQuietly(this.indexOutput);
        throw iOException;
    }

    private static Function<OutputStream, Encoder> createEncoderFactory() {
        return new Function<OutputStream, Encoder>() { // from class: co.cask.cdap.data.stream.StreamDataFileWriter.1
            public Encoder apply(OutputStream outputStream) {
                return new BinaryEncoder(outputStream);
            }
        };
    }
}
