package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.class */
public class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) PartitionRequestQueue.class);
    private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener();
    private final ArrayDeque<NetworkSequenceViewReader> availableReaders = new ArrayDeque<>();
    private final ConcurrentMap<InputChannelID, NetworkSequenceViewReader> allReaders = new ConcurrentHashMap();
    private final Set<InputChannelID> released = Sets.newHashSet();
    private boolean fatalError;
    private ChannelHandlerContext ctx;

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestQueue$WriteAndFlushNextMessageIfPossibleListener.class */
    private class WriteAndFlushNextMessageIfPossibleListener implements ChannelFutureListener {
        private WriteAndFlushNextMessageIfPossibleListener() {
        }

        @Override // org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            try {
                if (channelFuture.isSuccess()) {
                    PartitionRequestQueue.this.writeAndFlushNextMessageIfPossible(channelFuture.channel());
                } else if (channelFuture.cause() != null) {
                    PartitionRequestQueue.this.handleException(channelFuture.channel(), channelFuture.cause());
                } else {
                    PartitionRequestQueue.this.handleException(channelFuture.channel(), new IllegalStateException("Sending cancelled by user."));
                }
            } catch (Throwable th) {
                PartitionRequestQueue.this.handleException(channelFuture.channel(), th);
            }
        }
    }

    @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler
    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.ctx == null) {
            this.ctx = channelHandlerContext;
        }
        super.channelRegistered(channelHandlerContext);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyReaderNonEmpty(NetworkSequenceViewReader networkSequenceViewReader) {
        this.ctx.executor().execute(() -> {
            this.ctx.pipeline().fireUserEventTriggered((Object) networkSequenceViewReader);
        });
    }

    private void enqueueAvailableReader(NetworkSequenceViewReader networkSequenceViewReader) throws Exception {
        if (networkSequenceViewReader.isRegisteredAsAvailable() || !networkSequenceViewReader.isAvailable()) {
            return;
        }
        boolean isEmpty = this.availableReaders.isEmpty();
        registerAvailableReader(networkSequenceViewReader);
        if (isEmpty) {
            writeAndFlushNextMessageIfPossible(this.ctx.channel());
        }
    }

    @VisibleForTesting
    ArrayDeque<NetworkSequenceViewReader> getAvailableReaders() {
        return this.availableReaders;
    }

    public void notifyReaderCreated(NetworkSequenceViewReader networkSequenceViewReader) {
        this.allReaders.put(networkSequenceViewReader.getReceiverId(), networkSequenceViewReader);
    }

    public void cancel(InputChannelID inputChannelID) {
        this.ctx.pipeline().fireUserEventTriggered((Object) inputChannelID);
    }

    public void close() {
        if (this.ctx != null) {
            this.ctx.channel().close();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addCredit(InputChannelID inputChannelID, int i) throws Exception {
        if (this.fatalError) {
            return;
        }
        NetworkSequenceViewReader networkSequenceViewReader = this.allReaders.get(inputChannelID);
        if (networkSequenceViewReader == null) {
            throw new IllegalStateException("No reader for receiverId = " + inputChannelID + " exists.");
        }
        networkSequenceViewReader.addCredit(i);
        enqueueAvailableReader(networkSequenceViewReader);
    }

    @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler
    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj instanceof NetworkSequenceViewReader) {
            enqueueAvailableReader((NetworkSequenceViewReader) obj);
            return;
        }
        if (obj.getClass() != InputChannelID.class) {
            channelHandlerContext.fireUserEventTriggered(obj);
            return;
        }
        InputChannelID inputChannelID = (InputChannelID) obj;
        if (this.released.contains(inputChannelID)) {
            return;
        }
        int size = this.availableReaders.size();
        for (int i = 0; i < size; i++) {
            NetworkSequenceViewReader pollAvailableReader = pollAvailableReader();
            if (pollAvailableReader.getReceiverId().equals(inputChannelID)) {
                pollAvailableReader.releaseAllResources();
                markAsReleased(pollAvailableReader.getReceiverId());
            } else {
                registerAvailableReader(pollAvailableReader);
            }
        }
        this.allReaders.remove(inputChannelID);
    }

    @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler
    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
        writeAndFlushNextMessageIfPossible(channelHandlerContext.channel());
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Code restructure failed: missing block: B:26:0x0076, code lost:
    
        if (r9.moreAvailable() == false) goto L23;
     */
    /* JADX WARN: Code restructure failed: missing block: B:27:0x0079, code lost:
    
        registerAvailableReader(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:28:0x007e, code lost:
    
        r0 = new org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse(r9.buffer(), r0.getSequenceNumber(), r0.getReceiverId(), r9.buffersInBacklog());
     */
    /* JADX WARN: Code restructure failed: missing block: B:29:0x00a3, code lost:
    
        if (isEndOfPartitionEvent(r9.buffer()) == false) goto L26;
     */
    /* JADX WARN: Code restructure failed: missing block: B:30:0x00a6, code lost:
    
        r0.notifySubpartitionConsumed();
        r0.releaseAllResources();
        markAsReleased(r0.getReceiverId());
     */
    /* JADX WARN: Code restructure failed: missing block: B:31:0x00bc, code lost:
    
        r8.writeAndFlush(r0).addListener2((org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener<? extends org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future<? super java.lang.Void>>) r7.writeListener);
     */
    /* JADX WARN: Code restructure failed: missing block: B:32:0x00ce, code lost:
    
        return;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void writeAndFlushNextMessageIfPossible(org.apache.flink.shaded.netty4.io.netty.channel.Channel r8) throws java.io.IOException {
        /*
            r7 = this;
            r0 = r7
            boolean r0 = r0.fatalError
            if (r0 != 0) goto L10
            r0 = r8
            boolean r0 = r0.isWritable()
            if (r0 != 0) goto L11
        L10:
            return
        L11:
            r0 = 0
            r9 = r0
        L13:
            r0 = r7
            org.apache.flink.runtime.io.network.NetworkSequenceViewReader r0 = r0.pollAvailableReader()     // Catch: java.lang.Throwable -> Ld2
            r10 = r0
            r0 = r10
            if (r0 != 0) goto L1d
            return
        L1d:
            r0 = r10
            org.apache.flink.runtime.io.network.partition.consumer.InputChannel$BufferAndAvailability r0 = r0.getNextBuffer()     // Catch: java.lang.Throwable -> Ld2
            r9 = r0
            r0 = r9
            if (r0 != 0) goto L72
            r0 = r10
            boolean r0 = r0.isReleased()     // Catch: java.lang.Throwable -> Ld2
            if (r0 != 0) goto L34
            goto L13
        L34:
            r0 = r7
            r1 = r10
            org.apache.flink.runtime.io.network.partition.consumer.InputChannelID r1 = r1.getReceiverId()     // Catch: java.lang.Throwable -> Ld2
            r0.markAsReleased(r1)     // Catch: java.lang.Throwable -> Ld2
            r0 = r10
            java.lang.Throwable r0 = r0.getFailureCause()     // Catch: java.lang.Throwable -> Ld2
            r11 = r0
            r0 = r11
            if (r0 == 0) goto L6f
            org.apache.flink.runtime.io.network.netty.NettyMessage$ErrorResponse r0 = new org.apache.flink.runtime.io.network.netty.NettyMessage$ErrorResponse     // Catch: java.lang.Throwable -> Ld2
            r1 = r0
            org.apache.flink.runtime.io.network.partition.ProducerFailedException r2 = new org.apache.flink.runtime.io.network.partition.ProducerFailedException     // Catch: java.lang.Throwable -> Ld2
            r3 = r2
            r4 = r11
            r3.<init>(r4)     // Catch: java.lang.Throwable -> Ld2
            r3 = r10
            org.apache.flink.runtime.io.network.partition.consumer.InputChannelID r3 = r3.getReceiverId()     // Catch: java.lang.Throwable -> Ld2
            r1.<init>(r2, r3)     // Catch: java.lang.Throwable -> Ld2
            r12 = r0
            r0 = r7
            org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext r0 = r0.ctx     // Catch: java.lang.Throwable -> Ld2
            r1 = r12
            org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture r0 = r0.writeAndFlush(r1)     // Catch: java.lang.Throwable -> Ld2
        L6f:
            goto Lcf
        L72:
            r0 = r9
            boolean r0 = r0.moreAvailable()     // Catch: java.lang.Throwable -> Ld2
            if (r0 == 0) goto L7e
            r0 = r7
            r1 = r10
            r0.registerAvailableReader(r1)     // Catch: java.lang.Throwable -> Ld2
        L7e:
            org.apache.flink.runtime.io.network.netty.NettyMessage$BufferResponse r0 = new org.apache.flink.runtime.io.network.netty.NettyMessage$BufferResponse     // Catch: java.lang.Throwable -> Ld2
            r1 = r0
            r2 = r9
            org.apache.flink.runtime.io.network.buffer.Buffer r2 = r2.buffer()     // Catch: java.lang.Throwable -> Ld2
            r3 = r10
            int r3 = r3.getSequenceNumber()     // Catch: java.lang.Throwable -> Ld2
            r4 = r10
            org.apache.flink.runtime.io.network.partition.consumer.InputChannelID r4 = r4.getReceiverId()     // Catch: java.lang.Throwable -> Ld2
            r5 = r9
            int r5 = r5.buffersInBacklog()     // Catch: java.lang.Throwable -> Ld2
            r1.<init>(r2, r3, r4, r5)     // Catch: java.lang.Throwable -> Ld2
            r11 = r0
            r0 = r7
            r1 = r9
            org.apache.flink.runtime.io.network.buffer.Buffer r1 = r1.buffer()     // Catch: java.lang.Throwable -> Ld2
            boolean r0 = r0.isEndOfPartitionEvent(r1)     // Catch: java.lang.Throwable -> Ld2
            if (r0 == 0) goto Lbc
            r0 = r10
            r0.notifySubpartitionConsumed()     // Catch: java.lang.Throwable -> Ld2
            r0 = r10
            r0.releaseAllResources()     // Catch: java.lang.Throwable -> Ld2
            r0 = r7
            r1 = r10
            org.apache.flink.runtime.io.network.partition.consumer.InputChannelID r1 = r1.getReceiverId()     // Catch: java.lang.Throwable -> Ld2
            r0.markAsReleased(r1)     // Catch: java.lang.Throwable -> Ld2
        Lbc:
            r0 = r8
            r1 = r11
            org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture r0 = r0.writeAndFlush(r1)     // Catch: java.lang.Throwable -> Ld2
            r1 = r7
            org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener r1 = r1.writeListener     // Catch: java.lang.Throwable -> Ld2
            org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture r0 = r0.addListener2(r1)     // Catch: java.lang.Throwable -> Ld2
            return
        Lcf:
            goto L13
        Ld2:
            r10 = move-exception
            r0 = r9
            if (r0 == 0) goto Le0
            r0 = r9
            org.apache.flink.runtime.io.network.buffer.Buffer r0 = r0.buffer()
            r0.recycleBuffer()
        Le0:
            java.io.IOException r0 = new java.io.IOException
            r1 = r0
            r2 = r10
            java.lang.String r2 = r2.getMessage()
            r3 = r10
            r1.<init>(r2, r3)
            throw r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(org.apache.flink.shaded.netty4.io.netty.channel.Channel):void");
    }

    private void registerAvailableReader(NetworkSequenceViewReader networkSequenceViewReader) {
        this.availableReaders.add(networkSequenceViewReader);
        networkSequenceViewReader.setRegisteredAsAvailable(true);
    }

    @Nullable
    private NetworkSequenceViewReader pollAvailableReader() {
        NetworkSequenceViewReader poll = this.availableReaders.poll();
        if (poll != null) {
            poll.setRegisteredAsAvailable(false);
        }
        return poll;
    }

    private boolean isEndOfPartitionEvent(Buffer buffer) throws IOException {
        return EventSerializer.isEvent(buffer, (Class<?>) EndOfPartitionEvent.class);
    }

    @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        releaseAllResources();
        channelHandlerContext.fireChannelInactive();
    }

    @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter, org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler, org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        handleException(channelHandlerContext.channel(), th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleException(Channel channel, Throwable th) throws IOException {
        LOG.error("Encountered error while consuming partitions", th);
        this.fatalError = true;
        releaseAllResources();
        if (channel.isActive()) {
            channel.writeAndFlush(new NettyMessage.ErrorResponse(th)).addListener2((GenericFutureListener<? extends Future<? super Void>>) ChannelFutureListener.CLOSE);
        }
    }

    private void releaseAllResources() throws IOException {
        for (NetworkSequenceViewReader networkSequenceViewReader : this.allReaders.values()) {
            networkSequenceViewReader.releaseAllResources();
            markAsReleased(networkSequenceViewReader.getReceiverId());
        }
        this.availableReaders.clear();
        this.allReaders.clear();
    }

    private void markAsReleased(InputChannelID inputChannelID) {
        this.released.add(inputChannelID);
    }
}
