package org.apache.hadoop.hbase.io.asyncfs;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufAllocator;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandler;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState;
import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateEvent;
import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateHandler;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Future;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Promise;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.PromiseCombiner;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.util.DataChecksum;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.class */
public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
    private static final int MAX_DATA_LEN = 12582912;
    private final Configuration conf;
    private final FSUtils fsUtils;
    private final DistributedFileSystem dfs;
    private final DFSClient client;
    private final ClientProtocol namenode;
    private final String clientName;
    private final String src;
    private final long fileId;
    private final LocatedBlock locatedBlock;
    private final Encryptor encryptor;
    private final EventLoop eventLoop;
    private final List<Channel> datanodeList;
    private final DataChecksum summer;
    private final int maxDataLen;
    private final ByteBufAllocator alloc;
    private ByteBuf buf;
    private static final int LIMIT = 134217728;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Deque<Callback> waitingAckQueue = new ArrayDeque();
    private long nextPacketOffsetInBlock = 0;
    private long nextPacketSeqno = 0;
    private int capacity = 4096;
    private State state = State.STREAMING;

    /* JADX INFO: Access modifiers changed from: private */
    @ChannelHandler.Sharable
    /* loaded from: input_file:org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput$AckHandler.class */
    public final class AckHandler extends SimpleChannelInboundHandler<DataTransferProtos.PipelineAckProto> {
        private final int timeoutMs;

        public AckHandler(int i) {
            this.timeoutMs = i;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler
        public void channelRead0(ChannelHandlerContext channelHandlerContext, DataTransferProtos.PipelineAckProto pipelineAckProto) throws Exception {
            DataTransferProtos.Status status = FanOutOneBlockAsyncDFSOutputHelper.getStatus(pipelineAckProto);
            if (status != DataTransferProtos.Status.SUCCESS) {
                FanOutOneBlockAsyncDFSOutput.this.failed(channelHandlerContext.channel(), () -> {
                    return new IOException("Bad response " + status + " for block " + FanOutOneBlockAsyncDFSOutput.this.locatedBlock.getBlock() + " from datanode " + channelHandlerContext.channel().remoteAddress());
                });
            } else if (PipelineAck.isRestartOOBStatus(status)) {
                FanOutOneBlockAsyncDFSOutput.this.failed(channelHandlerContext.channel(), () -> {
                    return new IOException("Restart response " + status + " for block " + FanOutOneBlockAsyncDFSOutput.this.locatedBlock.getBlock() + " from datanode " + channelHandlerContext.channel().remoteAddress());
                });
            } else {
                if (pipelineAckProto.getSeqno() == -1) {
                    return;
                }
                FanOutOneBlockAsyncDFSOutput.this.completed(channelHandlerContext.channel());
            }
        }

        @Override // org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelInboundHandler
        public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
            FanOutOneBlockAsyncDFSOutput.this.failed(channelHandlerContext.channel(), () -> {
                return new IOException("Connection to " + channelHandlerContext.channel().remoteAddress() + " closed");
            });
        }

        @Override // org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerAdapter, org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandler, org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelInboundHandler
        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
            FanOutOneBlockAsyncDFSOutput.this.failed(channelHandlerContext.channel(), () -> {
                return th;
            });
        }

        @Override // org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelInboundHandler
        public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
            if (!(obj instanceof IdleStateEvent)) {
                super.userEventTriggered(channelHandlerContext, obj);
                return;
            }
            IdleStateEvent idleStateEvent = (IdleStateEvent) obj;
            if (idleStateEvent.state() == IdleState.READER_IDLE) {
                FanOutOneBlockAsyncDFSOutput.this.failed(channelHandlerContext.channel(), () -> {
                    return new IOException("Timeout(" + this.timeoutMs + "ms) waiting for response");
                });
                return;
            }
            if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
                PacketHeader packetHeader = new PacketHeader(4, 0L, -1L, false, 0, false);
                int serializedSize = packetHeader.getSerializedSize();
                ByteBuf buffer = FanOutOneBlockAsyncDFSOutput.this.alloc.buffer(serializedSize);
                packetHeader.putInBuffer(buffer.nioBuffer(0, serializedSize));
                buffer.writerIndex(serializedSize);
                channelHandlerContext.channel().writeAndFlush(buffer);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput$Callback.class */
    public static final class Callback {
        private final Promise<Void> promise;
        private final long ackedLength;
        private final Set<Channel> unfinishedReplicas;

        public Callback(Promise<Void> promise, long j, Collection<Channel> collection) {
            this.promise = promise;
            this.ackedLength = j;
            if (collection.isEmpty()) {
                this.unfinishedReplicas = Collections.emptySet();
            } else {
                this.unfinishedReplicas = Collections.newSetFromMap(new IdentityHashMap(collection.size()));
                this.unfinishedReplicas.addAll(collection);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput$State.class */
    public enum State {
        STREAMING,
        CLOSING,
        BROKEN,
        CLOSED
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void completed(Channel channel) {
        if (this.waitingAckQueue.isEmpty()) {
            return;
        }
        for (Callback callback : this.waitingAckQueue) {
            if (callback.unfinishedReplicas.remove(channel)) {
                if (!callback.unfinishedReplicas.isEmpty()) {
                    return;
                }
                callback.promise.trySuccess(null);
                this.waitingAckQueue.removeFirst();
                while (true) {
                    Callback peekFirst = this.waitingAckQueue.peekFirst();
                    if (peekFirst == null || peekFirst.ackedLength != callback.ackedLength) {
                        return;
                    }
                    peekFirst.promise.trySuccess(null);
                    this.waitingAckQueue.removeFirst();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void failed(Channel channel, Supplier<Throwable> supplier) {
        Callback peekFirst;
        if (this.state == State.BROKEN || this.state == State.CLOSED) {
            return;
        }
        if (this.state != State.CLOSING || ((peekFirst = this.waitingAckQueue.peekFirst()) != null && peekFirst.unfinishedReplicas.contains(channel))) {
            this.state = State.BROKEN;
            Throwable th = supplier.get();
            this.waitingAckQueue.stream().forEach(callback -> {
                callback.promise.tryFailure(th);
            });
            this.waitingAckQueue.clear();
            this.datanodeList.forEach(channel2 -> {
                channel2.close();
            });
        }
    }

    private void setupReceiver(int i) {
        AckHandler ackHandler = new AckHandler(i);
        for (Channel channel : this.datanodeList) {
            channel.pipeline().addLast(new IdleStateHandler(i, i / 2, 0L, TimeUnit.MILLISECONDS), new ProtobufVarint32FrameDecoder(), new ProtobufDecoder(DataTransferProtos.PipelineAckProto.getDefaultInstance()), ackHandler);
            channel.config().setAutoRead(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FanOutOneBlockAsyncDFSOutput(Configuration configuration, FSUtils fSUtils, DistributedFileSystem distributedFileSystem, DFSClient dFSClient, ClientProtocol clientProtocol, String str, String str2, long j, LocatedBlock locatedBlock, Encryptor encryptor, EventLoop eventLoop, List<Channel> list, DataChecksum dataChecksum, ByteBufAllocator byteBufAllocator) {
        this.conf = configuration;
        this.fsUtils = fSUtils;
        this.dfs = distributedFileSystem;
        this.client = dFSClient;
        this.namenode = clientProtocol;
        this.fileId = j;
        this.clientName = str;
        this.src = str2;
        this.locatedBlock = locatedBlock;
        this.encryptor = encryptor;
        this.eventLoop = eventLoop;
        this.datanodeList = list;
        this.summer = dataChecksum;
        this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % dataChecksum.getBytesPerChecksum());
        this.alloc = byteBufAllocator;
        this.buf = byteBufAllocator.directBuffer(this.capacity);
        setupReceiver(configuration.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 60000));
    }

    private void writeInt0(int i) {
        this.buf.ensureWritable(4);
        this.buf.writeInt(i);
    }

    @Override // org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput
    public void writeInt(int i) {
        if (this.eventLoop.inEventLoop()) {
            writeInt0(i);
        } else {
            this.eventLoop.submit(() -> {
                writeInt0(i);
            });
        }
    }

    private void write0(ByteBuffer byteBuffer) {
        this.buf.ensureWritable(byteBuffer.remaining());
        this.buf.writeBytes(byteBuffer);
    }

    @Override // org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput
    public void write(ByteBuffer byteBuffer) {
        if (this.eventLoop.inEventLoop()) {
            write0(byteBuffer);
        } else {
            this.eventLoop.submit(() -> {
                write0(byteBuffer);
            });
        }
    }

    @Override // org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput
    public void write(byte[] bArr) {
        write(bArr, 0, bArr.length);
    }

    private void write0(byte[] bArr, int i, int i2) {
        this.buf.ensureWritable(i2);
        this.buf.writeBytes(bArr, i, i2);
    }

    @Override // org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput
    public void write(byte[] bArr, int i, int i2) {
        if (this.eventLoop.inEventLoop()) {
            write0(bArr, i, i2);
        } else {
            this.eventLoop.submit(() -> {
                write0(bArr, i, i2);
            }).syncUninterruptibly2();
        }
    }

    @Override // org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput
    public int buffered() {
        return this.eventLoop.inEventLoop() ? this.buf.readableBytes() : ((Integer) this.eventLoop.submit(() -> {
            return Integer.valueOf(this.buf.readableBytes());
        }).syncUninterruptibly2().getNow()).intValue();
    }

    @Override // org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput
    public DatanodeInfo[] getPipeline() {
        return this.locatedBlock.getLocations();
    }

    private Promise<Void> flushBuffer(ByteBuf byteBuf, long j, boolean z) {
        int readableBytes = byteBuf.readableBytes();
        int bytesPerChecksum = this.summer.getBytesPerChecksum();
        int checksumSize = ((readableBytes / bytesPerChecksum) + (readableBytes % bytesPerChecksum != 0 ? 1 : 0)) * this.summer.getChecksumSize();
        ByteBuf directBuffer = this.alloc.directBuffer(checksumSize);
        this.summer.calculateChunkedSums(byteBuf.nioBuffer(), directBuffer.nioBuffer(0, checksumSize));
        directBuffer.writerIndex(checksumSize);
        PacketHeader packetHeader = new PacketHeader(4 + checksumSize + readableBytes, j, this.nextPacketSeqno, false, readableBytes, z);
        int serializedSize = packetHeader.getSerializedSize();
        ByteBuf buffer = this.alloc.buffer(serializedSize);
        packetHeader.putInBuffer(buffer.nioBuffer(0, serializedSize));
        buffer.writerIndex(serializedSize);
        long j2 = j + readableBytes;
        Promise<Void> addListener2 = this.eventLoop.newPromise().addListener2(future -> {
            if (future.isSuccess()) {
                this.locatedBlock.getBlock().setNumBytes(j2);
            }
        });
        this.waitingAckQueue.addLast(new Callback(addListener2, j2, this.datanodeList));
        for (Channel channel : this.datanodeList) {
            channel.write(buffer.duplicate().retain());
            channel.write(directBuffer.duplicate().retain());
            channel.writeAndFlush(byteBuf.duplicate().retain());
        }
        directBuffer.release();
        buffer.release();
        byteBuf.release();
        this.nextPacketSeqno++;
        return addListener2;
    }

    private void flush0(CompletableFuture<Long> completableFuture, boolean z) {
        Promise<Void> flushBuffer;
        if (this.state != State.STREAMING) {
            completableFuture.completeExceptionally(new IOException("stream already broken"));
            return;
        }
        int readableBytes = this.buf.readableBytes();
        if (this.encryptor != null) {
            ByteBuf directBuffer = this.alloc.directBuffer(readableBytes);
            try {
                this.encryptor.encrypt(this.buf.nioBuffer(this.buf.readerIndex(), readableBytes), directBuffer.nioBuffer(0, readableBytes));
                directBuffer.writerIndex(readableBytes);
                this.buf.release();
                this.buf = directBuffer;
            } catch (IOException e) {
                directBuffer.release();
                completableFuture.completeExceptionally(e);
                return;
            }
        }
        long j = this.nextPacketOffsetInBlock + readableBytes;
        if (j == this.locatedBlock.getBlock().getNumBytes()) {
            completableFuture.complete(Long.valueOf(this.locatedBlock.getBlock().getNumBytes()));
            return;
        }
        Callback peekLast = this.waitingAckQueue.peekLast();
        if (peekLast != null && j == peekLast.ackedLength) {
            this.waitingAckQueue.addLast(new Callback(this.eventLoop.newPromise().addListener2(future -> {
                if (future.isSuccess()) {
                    completableFuture.complete(Long.valueOf(j));
                } else {
                    completableFuture.completeExceptionally(future.cause());
                }
            }), j, Collections.emptyList()));
            return;
        }
        if (readableBytes > this.maxDataLen) {
            PromiseCombiner promiseCombiner = new PromiseCombiner();
            long j2 = this.nextPacketOffsetInBlock;
            int i = readableBytes;
            while (true) {
                int i2 = i;
                if (i2 <= 0) {
                    break;
                }
                int min = Math.min(i2, this.maxDataLen);
                promiseCombiner.add((Future) flushBuffer(this.buf.readRetainedSlice(min), j2, z));
                j2 += min;
                i = i2 - min;
            }
            flushBuffer = this.eventLoop.newPromise();
            promiseCombiner.finish(flushBuffer);
        } else {
            flushBuffer = flushBuffer(this.buf.retain(), this.nextPacketOffsetInBlock, z);
        }
        flushBuffer.addListener2(future2 -> {
            if (future2.isSuccess()) {
                completableFuture.complete(Long.valueOf(j));
            } else {
                completableFuture.completeExceptionally(future2.cause());
            }
        });
        int bytesPerChecksum = readableBytes % this.summer.getBytesPerChecksum();
        ByteBuf ensureWritable = this.alloc.directBuffer(guess(readableBytes)).ensureWritable(bytesPerChecksum);
        if (bytesPerChecksum != 0) {
            this.buf.readerIndex(readableBytes - bytesPerChecksum).readBytes(ensureWritable, bytesPerChecksum);
        }
        this.buf.release();
        this.buf = ensureWritable;
        this.nextPacketOffsetInBlock += readableBytes - bytesPerChecksum;
    }

    @Override // org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput
    public CompletableFuture<Long> flush(boolean z) {
        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
        if (this.eventLoop.inEventLoop()) {
            flush0(completableFuture, z);
        } else {
            this.eventLoop.execute(() -> {
                flush0(completableFuture, z);
            });
        }
        return completableFuture;
    }

    private void endBlock(Promise<Void> promise, long j) {
        if (this.state != State.STREAMING) {
            promise.tryFailure(new IOException("stream already broken"));
            return;
        }
        if (!this.waitingAckQueue.isEmpty()) {
            promise.tryFailure(new IllegalStateException("should call flush first before calling close"));
            return;
        }
        this.state = State.CLOSING;
        PacketHeader packetHeader = new PacketHeader(4, j, this.nextPacketSeqno, true, 0, false);
        this.buf.release();
        this.buf = null;
        int serializedSize = packetHeader.getSerializedSize();
        ByteBuf directBuffer = this.alloc.directBuffer(serializedSize);
        packetHeader.putInBuffer(directBuffer.nioBuffer(0, serializedSize));
        directBuffer.writerIndex(serializedSize);
        this.waitingAckQueue.add(new Callback(promise, j, this.datanodeList));
        this.datanodeList.forEach(channel -> {
            channel.writeAndFlush(directBuffer.duplicate().retain());
        });
        directBuffer.release();
    }

    @Override // org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput
    public void recoverAndClose(CancelableProgressable cancelableProgressable) throws IOException {
        if (!$assertionsDisabled && this.eventLoop.inEventLoop()) {
            throw new AssertionError();
        }
        this.datanodeList.forEach(channel -> {
            channel.closeFuture().awaitUninterruptibly2();
        });
        FanOutOneBlockAsyncDFSOutputHelper.endFileLease(this.client, this.fileId);
        this.fsUtils.recoverFileLease(this.dfs, new Path(this.src), this.conf, cancelableProgressable == null ? new FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose(this.client) : cancelableProgressable);
    }

    @Override // org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (!$assertionsDisabled && this.eventLoop.inEventLoop()) {
            throw new AssertionError();
        }
        Promise newPromise = this.eventLoop.newPromise();
        this.eventLoop.execute(() -> {
            endBlock(newPromise, this.nextPacketOffsetInBlock + this.buf.readableBytes());
        });
        newPromise.addListener2(future -> {
            this.datanodeList.forEach(channel -> {
                channel.close();
            });
        }).syncUninterruptibly2();
        this.datanodeList.forEach(channel -> {
            channel.closeFuture().awaitUninterruptibly2();
        });
        FanOutOneBlockAsyncDFSOutputHelper.completeFile(this.client, this.namenode, this.src, this.clientName, this.locatedBlock.getBlock(), this.fileId);
    }

    @VisibleForTesting
    int guess(int i) {
        if (i > this.capacity) {
            if ((this.capacity << 1) <= 134217728) {
                this.capacity <<= 1;
            }
        } else if ((this.capacity >> 1) >= i) {
            this.capacity >>= 1;
        }
        return this.capacity;
    }

    static {
        $assertionsDisabled = !FanOutOneBlockAsyncDFSOutput.class.desiredAssertionStatus();
    }
}
