package org.apache.nemo.runtime.executor.bytetransfer;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.Channel;
import io.netty.channel.DefaultFileRegion;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import javax.annotation.Nullable;
import org.apache.nemo.runtime.executor.bytetransfer.ByteTransferContext;
import org.apache.nemo.runtime.executor.bytetransfer.DataFrameEncoder;
import org.apache.nemo.runtime.executor.data.DataUtil;
import org.apache.nemo.runtime.executor.data.FileArea;
import org.apache.nemo.runtime.executor.data.partition.SerializedPartition;
import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.class */
public final class ByteOutputContext extends ByteTransferContext implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ByteOutputContext.class.getName());
    private final Channel channel;
    private volatile ByteOutputStream currentByteOutputStream;
    private volatile boolean closed;

    /* loaded from: input_file:org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext$ByteOutputStream.class */
    public final class ByteOutputStream extends OutputStream {
        private volatile boolean newSubStream = true;
        private volatile boolean closed = false;

        public ByteOutputStream() {
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
            ByteBuf ioBuffer = ByteOutputContext.this.channel.alloc().ioBuffer(1, 1);
            ioBuffer.writeByte(i);
            writeByteBuf(ioBuffer);
        }

        @Override // java.io.OutputStream
        public void write(byte[] bArr, int i, int i2) throws IOException {
            ByteBuf ioBuffer = ByteOutputContext.this.channel.alloc().ioBuffer(i2, i2);
            ioBuffer.writeBytes(bArr, i, i2);
            writeByteBuf(ioBuffer);
        }

        public ByteOutputStream writeSerializedPartition(SerializedPartition serializedPartition) throws IOException {
            write(serializedPartition.getData(), 0, serializedPartition.getLength());
            return this;
        }

        public ByteOutputStream writeFileArea(FileArea fileArea) throws IOException {
            Path path = Paths.get(fileArea.getPath(), new String[0]);
            long position = fileArea.getPosition();
            long count = fileArea.getCount();
            while (true) {
                long j = count;
                if (j <= 0) {
                    return this;
                }
                long min = Math.min(j, 4294967295L);
                writeDataFrame(new DefaultFileRegion(FileChannel.open(path, new OpenOption[0]), position, min), min);
                position += min;
                count = j - min;
            }
        }

        @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.closed) {
                return;
            }
            if (this.newSubStream) {
                writeDataFrame(null, 0L);
            }
            this.closed = true;
        }

        private void writeByteBuf(ByteBuf byteBuf) throws IOException {
            if (byteBuf.readableBytes() > 0) {
                writeDataFrame(byteBuf, byteBuf.readableBytes());
            }
        }

        public void writeElement(Object obj, Serializer serializer) {
            ByteBuf ioBuffer = ByteOutputContext.this.channel.alloc().ioBuffer();
            try {
                OutputStream buildOutputStream = DataUtil.buildOutputStream(new ByteBufOutputStream(ioBuffer), serializer.getEncodeStreamChainers());
                serializer.getEncoderFactory().create(buildOutputStream).encode(obj);
                buildOutputStream.close();
                writeByteBuf(ioBuffer);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        private void writeDataFrame(Object obj, long j) throws IOException {
            ByteOutputContext.this.ensureNoException();
            if (this.closed) {
                throw new IOException("Stream already closed.");
            }
            ByteOutputContext.this.channel.writeAndFlush(DataFrameEncoder.DataFrame.newInstance(ByteOutputContext.this.getContextId(), obj, j, this.newSubStream)).addListener(ByteOutputContext.this.getChannelWriteListener());
            this.newSubStream = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ByteOutputContext(String str, ByteTransferContext.ContextId contextId, byte[] bArr, ContextManager contextManager) {
        super(str, contextId, bArr, contextManager);
        this.currentByteOutputStream = null;
        this.closed = false;
        this.channel = contextManager.getChannel();
    }

    public ByteOutputStream newOutputStream() throws IOException {
        ensureNoException();
        if (this.closed) {
            throw new IOException("Context already closed.");
        }
        if (this.currentByteOutputStream != null) {
            this.currentByteOutputStream.close();
        }
        this.currentByteOutputStream = new ByteOutputStream();
        return this.currentByteOutputStream;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IOException {
        ensureNoException();
        if (this.closed) {
            return;
        }
        if (this.currentByteOutputStream != null) {
            this.currentByteOutputStream.close();
        }
        this.channel.writeAndFlush(DataFrameEncoder.DataFrame.newInstance(getContextId())).addListener(getChannelWriteListener());
        deregister();
        this.closed = true;
    }

    @Override // org.apache.nemo.runtime.executor.bytetransfer.ByteTransferContext
    public void onChannelError(@Nullable Throwable th) {
        setChannelError(th);
        this.channel.close();
    }

    void ensureNoException() throws IOException {
        if (hasException()) {
            if (getException() != null) {
                throw new IOException(getException());
            }
            throw new IOException();
        }
    }
}
