package com.linkedin.r2.filter.compression.streaming;

import com.linkedin.data.ByteString;
import com.linkedin.r2.message.stream.entitystream.EntityStream;
import com.linkedin.r2.message.stream.entitystream.ReadHandle;
import com.linkedin.r2.message.stream.entitystream.Reader;
import com.linkedin.r2.message.stream.entitystream.WriteHandle;
import com.linkedin.r2.message.stream.entitystream.Writer;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/linkedin/r2/filter/compression/streaming/StreamingDeflater.class */
public abstract class StreamingDeflater implements Reader, Writer {
    private ReadHandle _rh;
    private WriteHandle _wh;
    private OutputStream _out;
    private BufferedWriterOutputStream _writerOutputStream;
    private volatile boolean _readCancelled = false;
    private final EntityStream _underlying;

    /* loaded from: input_file:com/linkedin/r2/filter/compression/streaming/StreamingDeflater$BufferedWriterOutputStream.class */
    private class BufferedWriterOutputStream extends OutputStream {
        private static final int BUF_SIZE = 8192;
        private final Queue<ByteString> _data;
        private final byte[] _buffer;
        private int _writeIndex;
        private boolean _done;

        private BufferedWriterOutputStream() {
            this._data = new ConcurrentLinkedQueue();
            this._buffer = new byte[8192];
            this._writeIndex = 0;
            this._done = false;
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
            byte[] bArr = this._buffer;
            int i2 = this._writeIndex;
            this._writeIndex = i2 + 1;
            bArr[i2] = (byte) i;
            if (this._writeIndex == 8192) {
                this._data.add(ByteString.copy(this._buffer));
                this._writeIndex = 0;
                writeIfPossible();
            }
        }

        @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this._writeIndex > 0) {
                this._data.add(ByteString.copy(this._buffer, 0, this._writeIndex));
            }
            this._done = true;
            writeIfPossible();
        }

        public synchronized void writeIfPossible() {
            while (StreamingDeflater.this._wh.remaining() > 0) {
                if (this._data.isEmpty()) {
                    if (this._done) {
                        StreamingDeflater.this._wh.done();
                        return;
                    } else {
                        StreamingDeflater.this._rh.request(1);
                        return;
                    }
                }
                StreamingDeflater.this._wh.write(this._data.poll());
            }
        }

        public boolean needMore() {
            return StreamingDeflater.this._wh.remaining() > 0 && this._data.isEmpty();
        }
    }

    public StreamingDeflater(EntityStream entityStream) {
        this._underlying = entityStream;
    }

    @Override // com.linkedin.r2.message.stream.entitystream.Reader
    public void onInit(ReadHandle readHandle) {
        this._rh = readHandle;
    }

    @Override // com.linkedin.r2.message.stream.entitystream.Observer
    public void onDataAvailable(ByteString byteString) {
        if (this._readCancelled) {
            return;
        }
        try {
            byteString.write(this._out);
            if (this._writerOutputStream.needMore()) {
                this._rh.request(1);
            }
        } catch (IOException e) {
            this._wh.error(e);
            cancel();
        }
    }

    @Override // com.linkedin.r2.message.stream.entitystream.Observer
    public void onDone() {
        try {
            this._out.close();
        } catch (IOException e) {
            this._wh.error(e);
        }
    }

    @Override // com.linkedin.r2.message.stream.entitystream.Observer
    public void onError(Throwable th) {
        this._wh.error(th);
    }

    @Override // com.linkedin.r2.message.stream.entitystream.Writer
    public void onInit(WriteHandle writeHandle) {
        try {
            this._wh = writeHandle;
            this._writerOutputStream = new BufferedWriterOutputStream();
            this._out = createOutputStream(this._writerOutputStream);
            this._underlying.setReader(this);
        } catch (IOException e) {
            this._wh.error(e);
            cancel();
        }
    }

    @Override // com.linkedin.r2.message.stream.entitystream.Writer
    public void onWritePossible() {
        this._writerOutputStream.writeIfPossible();
    }

    @Override // com.linkedin.r2.message.stream.entitystream.Writer
    public void onAbort(Throwable th) {
        cancel();
    }

    private void cancel() {
        this._rh.cancel();
        this._readCancelled = true;
    }

    protected abstract OutputStream createOutputStream(OutputStream outputStream) throws IOException;
}
