package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Optional;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.TestingPartitionRequestClient;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/CheckpointedInputGateTest.class */
public class CheckpointedInputGateTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/CheckpointedInputGateTest$ResumeCountingConnectionManager.class */
    public static class ResumeCountingConnectionManager extends TestingConnectionManager {
        private int numResumed;

        private ResumeCountingConnectionManager() {
        }

        public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionID) {
            return new TestingPartitionRequestClient() { // from class: org.apache.flink.streaming.runtime.io.CheckpointedInputGateTest.ResumeCountingConnectionManager.1
                public void resumeConsumption(RemoteInputChannel remoteInputChannel) {
                    ResumeCountingConnectionManager.access$208(ResumeCountingConnectionManager.this);
                    super.resumeConsumption(remoteInputChannel);
                }
            };
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getNumResumed() {
            return this.numResumed;
        }

        static /* synthetic */ int access$208(ResumeCountingConnectionManager resumeCountingConnectionManager) {
            int i = resumeCountingConnectionManager.numResumed;
            resumeCountingConnectionManager.numResumed = i + 1;
            return i;
        }
    }

    @Test
    public void testUpstreamResumedUponEndOfRecovery() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(11 * 3, StreamTaskTestHarness.DEFAULT_NETWORK_BUFFER_SIZE);
        try {
            CheckpointedInputGate checkpointedInputGate = setupInputGate(11, networkBufferPool, new ResumeCountingConnectionManager());
            Assert.assertFalse(checkpointedInputGate.pollNext().isPresent());
            for (int i = 0; i < 11 - 1; i++) {
                emitEndOfState(checkpointedInputGate, i);
                Assert.assertFalse("should align (block all channels)", checkpointedInputGate.pollNext().isPresent());
            }
            emitEndOfState(checkpointedInputGate, 11 - 1);
            Optional pollNext = checkpointedInputGate.pollNext();
            Assert.assertTrue(pollNext.isPresent());
            Assert.assertTrue(((BufferOrEvent) pollNext.get()).isEvent());
            Assert.assertEquals(EndOfChannelStateEvent.INSTANCE, ((BufferOrEvent) pollNext.get()).getEvent());
            Assert.assertEquals(11, r0.getNumResumed());
            Assert.assertFalse("should only be a single event no matter of what is the number of channels", checkpointedInputGate.pollNext().isPresent());
            networkBufferPool.destroy();
        } catch (Throwable th) {
            networkBufferPool.destroy();
            throw th;
        }
    }

    private void emitEndOfState(CheckpointedInputGate checkpointedInputGate, int i) throws IOException {
        checkpointedInputGate.getChannel(i).onBuffer(EventSerializer.toBuffer(EndOfChannelStateEvent.INSTANCE, false), 0, 0);
    }

    private CheckpointedInputGate setupInputGate(int i, NetworkBufferPool networkBufferPool, ResumeCountingConnectionManager resumeCountingConnectionManager) throws Exception {
        SingleInputGate build = new SingleInputGateBuilder().setBufferPoolFactory(networkBufferPool.createBufferPool(i, Integer.MAX_VALUE)).setSegmentProvider(networkBufferPool).setChannelFactory((inputChannelBuilder, singleInputGate) -> {
            return inputChannelBuilder.setConnectionManager(resumeCountingConnectionManager).buildRemoteChannel(singleInputGate);
        }).setNumberOfChannels(i).build();
        build.setup();
        CheckpointedInputGate checkpointedInputGate = new CheckpointedInputGate(build, new CheckpointBarrierTracker(i, new AbstractInvokable(new DummyEnvironment()) { // from class: org.apache.flink.streaming.runtime.io.CheckpointedInputGateTest.1
            public void invoke() {
            }
        }), new MailboxExecutorImpl(new TaskMailboxImpl(), 0, StreamTaskActionExecutor.IMMEDIATE), UpstreamRecoveryTracker.forInputGate(build));
        for (int i2 = 0; i2 < i; i2++) {
            checkpointedInputGate.getChannel(i2).requestSubpartition(0);
        }
        return checkpointedInputGate;
    }
}
