package org.apache.distributedlog.fs;

import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/distributedlog/fs/DLOutputStream.class */
public class DLOutputStream extends OutputStream {
    private final DistributedLogManager dlm;
    private final AsyncLogWriter writer;
    private long writePos;
    private static final Logger log = LoggerFactory.getLogger(DLOutputStream.class);
    private static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(Charsets.UTF_8);
    private static final AtomicReferenceFieldUpdater<DLOutputStream, Throwable> exceptionUpdater = AtomicReferenceFieldUpdater.newUpdater(DLOutputStream.class, Throwable.class, "exception");
    private final long[] syncPos = new long[1];
    private volatile Throwable exception = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DLOutputStream(DistributedLogManager distributedLogManager, AsyncLogWriter asyncLogWriter) {
        this.writePos = 0L;
        this.dlm = distributedLogManager;
        this.writer = asyncLogWriter;
        this.writePos = asyncLogWriter.getLastTxId() < 0 ? 0L : asyncLogWriter.getLastTxId();
        this.syncPos[0] = this.writePos;
    }

    public synchronized long position() {
        return this.syncPos[0];
    }

    @Override // java.io.OutputStream
    public void write(int i) throws IOException {
        write(new byte[]{(byte) i});
    }

    @Override // java.io.OutputStream
    public void write(byte[] bArr) throws IOException {
        write(Unpooled.wrappedBuffer(bArr));
    }

    @Override // java.io.OutputStream
    public void write(byte[] bArr, int i, int i2) throws IOException {
        write(Unpooled.wrappedBuffer(bArr, i, i2));
    }

    private synchronized void write(ByteBuf byteBuf) throws IOException {
        Throwable th = exceptionUpdater.get(this);
        if (null != th) {
            if (!(th instanceof IOException)) {
                throw new UnexpectedException("Encountered unknown issue", th);
            }
            throw ((IOException) th);
        }
        this.writePos += byteBuf.readableBytes();
        final LogRecord logRecord = new LogRecord(this.writePos, byteBuf);
        this.writer.write(logRecord).whenComplete((BiConsumer) new FutureEventListener<DLSN>() { // from class: org.apache.distributedlog.fs.DLOutputStream.1
            public void onSuccess(DLSN dlsn) {
                synchronized (DLOutputStream.this.syncPos) {
                    DLOutputStream.this.syncPos[0] = logRecord.getTransactionId();
                }
            }

            public void onFailure(Throwable th2) {
                DLOutputStream.exceptionUpdater.compareAndSet(DLOutputStream.this, null, th2);
            }
        });
    }

    private CompletableFuture<DLSN> writeControlRecord() {
        LogRecord logRecord;
        synchronized (this) {
            logRecord = new LogRecord(this.writePos, Unpooled.wrappedBuffer(CONTROL_RECORD_CONTENT));
            logRecord.setControl();
        }
        return this.writer.write(logRecord);
    }

    @Override // java.io.OutputStream, java.io.Flushable
    public void flush() throws IOException {
        try {
            FutureUtils.result(writeControlRecord());
        } catch (IOException e) {
            throw e;
        } catch (Exception e2) {
            log.error("Unexpected exception in DLOutputStream", e2);
            throw new UnexpectedException("unexpected exception in DLOutputStream#flush()", e2);
        }
    }

    @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Utils.ioResult(writeControlRecord().thenCompose(dlsn -> {
            return this.writer.asyncClose();
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r3 -> {
            return this.dlm.asyncClose();
        }));
    }
}
