package org.apache.flink.runtime.io.network.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.runtime.io.network.netty.exception.TransportException;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.shaded.com.google.common.collect.Maps;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.class */
public class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClientHandler.class);
    private final ConcurrentMap<InputChannelID, RemoteInputChannel> inputChannels = new ConcurrentHashMap();
    private final AtomicBoolean channelError = new AtomicBoolean(false);
    private final BufferListenerTask bufferListener = new BufferListenerTask();
    private final Queue<Object> stagedMessages = new ArrayDeque();
    private final StagedMessagesHandlerTask stagedMessagesHandler = new StagedMessagesHandlerTask();
    private final ConcurrentMap<InputChannelID, InputChannelID> cancelled = Maps.newConcurrentMap();
    private volatile ChannelHandlerContext ctx;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler$AsyncErrorNotificationTask.class */
    public class AsyncErrorNotificationTask implements Runnable {
        private final Throwable error;

        public AsyncErrorNotificationTask(Throwable th) {
            this.error = th;
        }

        @Override // java.lang.Runnable
        public void run() {
            PartitionRequestClientHandler.this.notifyAllChannelsOfErrorAndClose(this.error);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler$BufferListenerTask.class */
    public class BufferListenerTask implements EventListener<Buffer>, Runnable {
        private final AtomicReference<Buffer> availableBuffer;
        private NettyMessage.BufferResponse stagedBufferResponse;

        private BufferListenerTask() {
            this.availableBuffer = new AtomicReference<>();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean waitForBuffer(BufferProvider bufferProvider, NettyMessage.BufferResponse bufferResponse) {
            this.stagedBufferResponse = bufferResponse;
            if (!bufferProvider.addListener(this)) {
                this.stagedBufferResponse = null;
                return false;
            }
            if (!PartitionRequestClientHandler.this.ctx.channel().config().isAutoRead()) {
                return true;
            }
            PartitionRequestClientHandler.this.ctx.channel().config().setAutoRead(false);
            return true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean hasStagedBufferOrEvent() {
            return this.stagedBufferResponse != null;
        }

        @Override // org.apache.flink.runtime.util.event.EventListener
        public void onEvent(Buffer buffer) {
            boolean z = false;
            try {
                try {
                    if (buffer == null) {
                        this.stagedBufferResponse = null;
                        if (PartitionRequestClientHandler.this.stagedMessages.isEmpty()) {
                            PartitionRequestClientHandler.this.ctx.channel().config().setAutoRead(true);
                            PartitionRequestClientHandler.this.ctx.channel().read();
                        } else {
                            PartitionRequestClientHandler.this.ctx.channel().eventLoop().execute(PartitionRequestClientHandler.this.stagedMessagesHandler);
                        }
                    } else {
                        if (!this.availableBuffer.compareAndSet(null, buffer)) {
                            throw new IllegalStateException("Received a buffer notification,  but the previous one has not been handled yet.");
                        }
                        PartitionRequestClientHandler.this.ctx.channel().eventLoop().execute(this);
                        z = true;
                    }
                    if (z || buffer == null) {
                        return;
                    }
                    buffer.recycle();
                } catch (Throwable th) {
                    PartitionRequestClientHandler.this.ctx.channel().eventLoop().execute(new AsyncErrorNotificationTask(th));
                    if (0 != 0 || buffer == null) {
                        return;
                    }
                    buffer.recycle();
                }
            } catch (Throwable th2) {
                if (0 == 0 && buffer != null) {
                    buffer.recycle();
                }
                throw th2;
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = false;
            Buffer buffer = null;
            try {
                try {
                    Buffer andSet = this.availableBuffer.getAndSet(null);
                    if (andSet == null) {
                        throw new IllegalStateException("Running buffer availability task w/o a buffer.");
                    }
                    andSet.setSize(this.stagedBufferResponse.getSize());
                    this.stagedBufferResponse.getNettyBuffer().readBytes(andSet.getNioBuffer());
                    this.stagedBufferResponse.releaseBuffer();
                    RemoteInputChannel remoteInputChannel = (RemoteInputChannel) PartitionRequestClientHandler.this.inputChannels.get(this.stagedBufferResponse.receiverId);
                    if (remoteInputChannel != null) {
                        remoteInputChannel.onBuffer(andSet, this.stagedBufferResponse.sequenceNumber);
                        z = true;
                    } else {
                        PartitionRequestClientHandler.this.cancelRequestFor(this.stagedBufferResponse.receiverId);
                    }
                    this.stagedBufferResponse = null;
                    if (PartitionRequestClientHandler.this.stagedMessages.isEmpty()) {
                        PartitionRequestClientHandler.this.ctx.channel().config().setAutoRead(true);
                        PartitionRequestClientHandler.this.ctx.channel().read();
                    } else {
                        PartitionRequestClientHandler.this.ctx.channel().eventLoop().execute(PartitionRequestClientHandler.this.stagedMessagesHandler);
                    }
                    if (z || andSet == null) {
                        return;
                    }
                    andSet.recycle();
                } catch (Throwable th) {
                    PartitionRequestClientHandler.this.notifyAllChannelsOfErrorAndClose(th);
                    if (0 != 0 || 0 == 0) {
                        return;
                    }
                    buffer.recycle();
                }
            } catch (Throwable th2) {
                if (0 == 0 && 0 != 0) {
                    buffer.recycle();
                }
                throw th2;
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler$StagedMessagesHandlerTask.class */
    public class StagedMessagesHandlerTask implements Runnable {
        public StagedMessagesHandlerTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            Object poll;
            do {
                try {
                    poll = PartitionRequestClientHandler.this.stagedMessages.poll();
                    if (poll == null) {
                        PartitionRequestClientHandler.this.ctx.channel().config().setAutoRead(true);
                        PartitionRequestClientHandler.this.ctx.channel().read();
                        return;
                    }
                } catch (Throwable th) {
                    PartitionRequestClientHandler.this.notifyAllChannelsOfErrorAndClose(th);
                    return;
                }
            } while (PartitionRequestClientHandler.this.decodeMsg(poll));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addInputChannel(RemoteInputChannel remoteInputChannel) {
        Preconditions.checkState(!this.channelError.get(), "There has been an error in the channel.");
        if (this.inputChannels.containsKey(remoteInputChannel.getInputChannelId())) {
            return;
        }
        this.inputChannels.put(remoteInputChannel.getInputChannelId(), remoteInputChannel);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeInputChannel(RemoteInputChannel remoteInputChannel) {
        this.inputChannels.remove(remoteInputChannel.getInputChannelId());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cancelRequestFor(InputChannelID inputChannelID) {
        if (inputChannelID == null || this.ctx == null || this.cancelled.putIfAbsent(inputChannelID, inputChannelID) != null) {
            return;
        }
        this.ctx.writeAndFlush(new NettyMessage.CancelPartitionRequest(inputChannelID));
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.ctx == null) {
            this.ctx = channelHandlerContext;
        }
        super.channelActive(channelHandlerContext);
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (!this.inputChannels.isEmpty()) {
            SocketAddress remoteAddress = channelHandlerContext.channel().remoteAddress();
            notifyAllChannelsOfErrorAndClose(new RemoteTransportException("Connection unexpectedly closed by remote task manager '" + remoteAddress + "'. This might indicate that the remote task manager was lost.", remoteAddress));
        }
        super.channelInactive(channelHandlerContext);
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        if (th instanceof TransportException) {
            notifyAllChannelsOfErrorAndClose(th);
        } else {
            SocketAddress remoteAddress = channelHandlerContext.channel().remoteAddress();
            notifyAllChannelsOfErrorAndClose(((th instanceof IOException) && th.getMessage().equals("Connection reset by peer")) ? new RemoteTransportException("Lost connection to task manager '" + remoteAddress + "'. This indicates that the remote task manager was lost.", remoteAddress, th) : new LocalTransportException(th.getMessage(), channelHandlerContext.channel().localAddress(), th));
        }
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        try {
            if (this.bufferListener.hasStagedBufferOrEvent() || !this.stagedMessages.isEmpty()) {
                this.stagedMessages.add(obj);
            } else {
                decodeMsg(obj);
            }
        } catch (Throwable th) {
            notifyAllChannelsOfErrorAndClose(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyAllChannelsOfErrorAndClose(Throwable th) {
        if (this.channelError.compareAndSet(false, true)) {
            try {
                try {
                    Iterator<RemoteInputChannel> it = this.inputChannels.values().iterator();
                    while (it.hasNext()) {
                        it.next().onError(th);
                    }
                } catch (Throwable th2) {
                    LOG.warn("An Exception was thrown during error notification of a remote input channel.", th2);
                    this.inputChannels.clear();
                    if (this.ctx != null) {
                        this.ctx.close();
                    }
                }
            } finally {
                this.inputChannels.clear();
                if (this.ctx != null) {
                    this.ctx.close();
                }
            }
        }
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelReadComplete(channelHandlerContext);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean decodeMsg(Object obj) throws Throwable {
        Class<?> cls = obj.getClass();
        if (cls == NettyMessage.BufferResponse.class) {
            NettyMessage.BufferResponse bufferResponse = (NettyMessage.BufferResponse) obj;
            RemoteInputChannel remoteInputChannel = this.inputChannels.get(bufferResponse.receiverId);
            if (remoteInputChannel != null) {
                return decodeBufferOrEvent(remoteInputChannel, bufferResponse);
            }
            bufferResponse.releaseBuffer();
            cancelRequestFor(bufferResponse.receiverId);
            return true;
        }
        if (cls != NettyMessage.ErrorResponse.class) {
            throw new IllegalStateException("Received unknown message from producer: " + obj.getClass());
        }
        NettyMessage.ErrorResponse errorResponse = (NettyMessage.ErrorResponse) obj;
        SocketAddress remoteAddress = this.ctx.channel().remoteAddress();
        if (errorResponse.isFatalError()) {
            notifyAllChannelsOfErrorAndClose(new RemoteTransportException("Fatal error at remote task manager '" + remoteAddress + "'.", remoteAddress, errorResponse.cause));
            return true;
        }
        RemoteInputChannel remoteInputChannel2 = this.inputChannels.get(errorResponse.receiverId);
        if (remoteInputChannel2 == null) {
            return true;
        }
        if (errorResponse.cause.getClass() == PartitionNotFoundException.class) {
            remoteInputChannel2.onFailedPartitionRequest();
            return true;
        }
        remoteInputChannel2.onError(new RemoteTransportException("Error at remote task manager '" + remoteAddress + "'.", remoteAddress, errorResponse.cause));
        return true;
    }

    private boolean decodeBufferOrEvent(RemoteInputChannel remoteInputChannel, NettyMessage.BufferResponse bufferResponse) throws Throwable {
        try {
            if (!bufferResponse.isBuffer()) {
                byte[] bArr = new byte[bufferResponse.getSize()];
                bufferResponse.getNettyBuffer().readBytes(bArr);
                remoteInputChannel.onBuffer(new Buffer(MemorySegmentFactory.wrap(bArr), FreeingBufferRecycler.INSTANCE, false), bufferResponse.sequenceNumber);
                if (1 != 0) {
                    bufferResponse.releaseBuffer();
                }
                return true;
            }
            if (bufferResponse.getSize() == 0) {
                remoteInputChannel.onEmptyBuffer(bufferResponse.sequenceNumber);
                if (1 != 0) {
                    bufferResponse.releaseBuffer();
                }
                return true;
            }
            BufferProvider bufferProvider = remoteInputChannel.getBufferProvider();
            if (bufferProvider == null) {
                cancelRequestFor(bufferResponse.receiverId);
                if (1 != 0) {
                    bufferResponse.releaseBuffer();
                }
                return false;
            }
            do {
                Buffer requestBuffer = bufferProvider.requestBuffer();
                if (requestBuffer != null) {
                    requestBuffer.setSize(bufferResponse.getSize());
                    bufferResponse.getNettyBuffer().readBytes(requestBuffer.getNioBuffer());
                    remoteInputChannel.onBuffer(requestBuffer, bufferResponse.sequenceNumber);
                    if (1 != 0) {
                        bufferResponse.releaseBuffer();
                    }
                    return true;
                }
                if (this.bufferListener.waitForBuffer(bufferProvider, bufferResponse)) {
                    if (0 != 0) {
                        bufferResponse.releaseBuffer();
                    }
                    return false;
                }
            } while (!bufferProvider.isDestroyed());
            return false;
        } finally {
            if (1 != 0) {
                bufferResponse.releaseBuffer();
            }
        }
    }
}
