package org.apache.flink.streaming.runtime.io.checkpointing;

import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.MockChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.TestingPartitionRequestClient;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.util.clock.SystemClock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGateTest.class */
public class CheckpointedInputGateTest {
    private final HashMap<Integer, Integer> channelIndexToSequenceNumber = new HashMap<>();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGateTest$ResumeCountingConnectionManager.class */
    private static class ResumeCountingConnectionManager extends TestingConnectionManager {
        private int numResumed;

        private ResumeCountingConnectionManager() {
        }

        @Override // org.apache.flink.runtime.io.network.TestingConnectionManager
        public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionID) {
            return new TestingPartitionRequestClient() { // from class: org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGateTest.ResumeCountingConnectionManager.1
                @Override // org.apache.flink.runtime.io.network.TestingPartitionRequestClient
                public void resumeConsumption(RemoteInputChannel remoteInputChannel) {
                    ResumeCountingConnectionManager.access$208(ResumeCountingConnectionManager.this);
                    super.resumeConsumption(remoteInputChannel);
                }
            };
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getNumResumed() {
            return this.numResumed;
        }

        static /* synthetic */ int access$208(ResumeCountingConnectionManager resumeCountingConnectionManager) {
            int i = resumeCountingConnectionManager.numResumed;
            resumeCountingConnectionManager.numResumed = i + 1;
            return i;
        }
    }

    @Before
    public void setUp() {
        this.channelIndexToSequenceNumber.clear();
    }

    @Test
    public void testUpstreamResumedUponEndOfRecovery() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(11 * 3, StreamTaskTestHarness.DEFAULT_NETWORK_BUFFER_SIZE);
        try {
            CheckpointedInputGate checkpointedInputGate = setupInputGate(11, networkBufferPool, new ResumeCountingConnectionManager());
            Assert.assertFalse(checkpointedInputGate.pollNext().isPresent());
            for (int i = 0; i < 11 - 1; i++) {
                enqueueEndOfState(checkpointedInputGate, i);
                Optional pollNext = checkpointedInputGate.pollNext();
                while (pollNext.isPresent() && (((BufferOrEvent) pollNext.get()).getEvent() instanceof EndOfChannelStateEvent) && !checkpointedInputGate.allChannelsRecovered()) {
                    pollNext = checkpointedInputGate.pollNext();
                }
                Assert.assertFalse("should align (block all channels)", pollNext.isPresent());
            }
            enqueueEndOfState(checkpointedInputGate, 11 - 1);
            Optional pollNext2 = checkpointedInputGate.pollNext();
            Assert.assertTrue(pollNext2.isPresent());
            Assert.assertTrue(((BufferOrEvent) pollNext2.get()).isEvent());
            Assert.assertEquals(EndOfChannelStateEvent.INSTANCE, ((BufferOrEvent) pollNext2.get()).getEvent());
            Assert.assertEquals(11, r0.getNumResumed());
            Assert.assertFalse("should only be a single event no matter of what is the number of channels", checkpointedInputGate.pollNext().isPresent());
            networkBufferPool.destroy();
        } catch (Throwable th) {
            networkBufferPool.destroy();
            throw th;
        }
    }

    @Test
    public void testPersisting() throws Exception {
        testPersisting(false);
    }

    @Test
    public void testPersistingWithDrainingTheGate() throws Exception {
        testPersisting(true);
    }

    public void testPersisting(boolean z) throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(3 * 3, StreamTaskTestHarness.DEFAULT_NETWORK_BUFFER_SIZE);
        try {
            ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler(2L);
            RecordingChannelStateWriter recordingChannelStateWriter = new RecordingChannelStateWriter();
            CheckpointedInputGate checkpointedInputGate = setupInputGateWithAlternatingController(3, networkBufferPool, validatingCheckpointHandler, recordingChannelStateWriter);
            enqueue(checkpointedInputGate, 0, BufferBuilderTestUtils.buildSomeBuffer());
            enqueue(checkpointedInputGate, 0, barrier(2L));
            enqueue(checkpointedInputGate, 0, BufferBuilderTestUtils.buildSomeBuffer());
            enqueue(checkpointedInputGate, 1, BufferBuilderTestUtils.buildSomeBuffer());
            enqueue(checkpointedInputGate, 1, barrier(1L));
            enqueue(checkpointedInputGate, 1, BufferBuilderTestUtils.buildSomeBuffer());
            enqueue(checkpointedInputGate, 2, BufferBuilderTestUtils.buildSomeBuffer());
            Assert.assertEquals(0L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
            checkpointedInputGate.pollNext();
            Assert.assertEquals(1L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
            assertAddedInputSize(recordingChannelStateWriter, 0, 1);
            assertAddedInputSize(recordingChannelStateWriter, 1, 2);
            assertAddedInputSize(recordingChannelStateWriter, 2, 1);
            enqueue(checkpointedInputGate, 0, BufferBuilderTestUtils.buildSomeBuffer());
            enqueue(checkpointedInputGate, 1, BufferBuilderTestUtils.buildSomeBuffer());
            enqueue(checkpointedInputGate, 2, BufferBuilderTestUtils.buildSomeBuffer());
            while (z && checkpointedInputGate.pollNext().isPresent()) {
            }
            assertAddedInputSize(recordingChannelStateWriter, 0, 1);
            assertAddedInputSize(recordingChannelStateWriter, 1, 3);
            assertAddedInputSize(recordingChannelStateWriter, 2, 2);
            enqueue(checkpointedInputGate, 1, barrier(2L));
            enqueue(checkpointedInputGate, 1, BufferBuilderTestUtils.buildSomeBuffer());
            enqueue(checkpointedInputGate, 2, barrier(1L));
            enqueue(checkpointedInputGate, 2, BufferBuilderTestUtils.buildSomeBuffer());
            while (z && checkpointedInputGate.pollNext().isPresent()) {
            }
            assertAddedInputSize(recordingChannelStateWriter, 0, 1);
            assertAddedInputSize(recordingChannelStateWriter, 1, 3);
            assertAddedInputSize(recordingChannelStateWriter, 2, 3);
            enqueue(checkpointedInputGate, 2, barrier(2L));
            enqueue(checkpointedInputGate, 2, BufferBuilderTestUtils.buildSomeBuffer());
            while (z && checkpointedInputGate.pollNext().isPresent()) {
            }
            assertAddedInputSize(recordingChannelStateWriter, 0, 1);
            assertAddedInputSize(recordingChannelStateWriter, 1, 3);
            assertAddedInputSize(recordingChannelStateWriter, 2, 3);
            networkBufferPool.destroy();
        } catch (Throwable th) {
            networkBufferPool.destroy();
            throw th;
        }
    }

    @Test
    public void testPriorityBeforeClose() throws IOException, InterruptedException {
        MemorySegmentProvider networkBufferPool = new NetworkBufferPool(10, StreamTaskTestHarness.DEFAULT_NETWORK_BUFFER_SIZE);
        Closer create = Closer.create();
        Throwable th = null;
        try {
            try {
                networkBufferPool.getClass();
                create.register(networkBufferPool::destroy);
                for (int i = 0; i < 100; i++) {
                    setUp();
                    final SingleInputGate build = new SingleInputGateBuilder().setNumberOfChannels(2).setBufferPoolFactory(networkBufferPool.createBufferPool(2, Integer.MAX_VALUE)).setSegmentProvider(networkBufferPool).setChannelFactory((v0, v1) -> {
                        return v0.buildRemoteChannel(v1);
                    }).build();
                    build.setup();
                    build.getChannel(0).requestSubpartition();
                    TaskMailboxImpl taskMailboxImpl = new TaskMailboxImpl();
                    MailboxExecutorImpl mailboxExecutorImpl = new MailboxExecutorImpl(taskMailboxImpl, 0, StreamTaskActionExecutor.IMMEDIATE);
                    ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler(1L);
                    CheckpointedInputGate checkpointedInputGate = new CheckpointedInputGate(build, TestBarrierHandlerFactory.forTarget(validatingCheckpointHandler).create(build, new MockChannelStateWriter()), mailboxExecutorImpl, UpstreamRecoveryTracker.forInputGate(build));
                    int size = taskMailboxImpl.size();
                    enqueue(checkpointedInputGate, 0, (AbstractEvent) barrier(1L));
                    Deadline fromNow = Deadline.fromNow(Duration.ofMinutes(1L));
                    while (fromNow.hasTimeLeft() && size >= taskMailboxImpl.size()) {
                        Thread.sleep(1L);
                    }
                    final CountDownLatch countDownLatch = new CountDownLatch(2);
                    CheckedThread checkedThread = new CheckedThread("Canceler") { // from class: org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGateTest.1
                        public void go() throws IOException {
                            countDownLatch.countDown();
                            build.close();
                        }
                    };
                    checkedThread.start();
                    countDownLatch.countDown();
                    do {
                    } while (mailboxExecutorImpl.tryYield());
                    Assert.assertEquals(1L, validatingCheckpointHandler.triggeredCheckpointCounter);
                    checkedThread.join();
                }
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    private static CheckpointBarrier barrier(long j) {
        return new CheckpointBarrier(j, j, CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()));
    }

    private void assertAddedInputSize(RecordingChannelStateWriter recordingChannelStateWriter, int i, int i2) {
        Assert.assertEquals(i2, recordingChannelStateWriter.getAddedInput().get(new InputChannelInfo(0, i)).size());
    }

    private void enqueueEndOfState(CheckpointedInputGate checkpointedInputGate, int i) throws IOException {
        enqueue(checkpointedInputGate, i, (AbstractEvent) EndOfChannelStateEvent.INSTANCE);
    }

    private void enqueueEndOfPartition(CheckpointedInputGate checkpointedInputGate, int i) throws IOException {
        enqueue(checkpointedInputGate, i, (AbstractEvent) EndOfPartitionEvent.INSTANCE);
    }

    private void enqueue(CheckpointedInputGate checkpointedInputGate, int i, AbstractEvent abstractEvent) throws IOException {
        boolean z = false;
        if (abstractEvent instanceof CheckpointBarrier) {
            z = ((CheckpointBarrier) abstractEvent).getCheckpointOptions().isUnalignedCheckpoint();
        }
        enqueue(checkpointedInputGate, i, EventSerializer.toBuffer(abstractEvent, z));
    }

    private void enqueue(CheckpointedInputGate checkpointedInputGate, int i, Buffer buffer) throws IOException {
        checkpointedInputGate.getChannel(i).onBuffer(buffer, this.channelIndexToSequenceNumber.compute(Integer.valueOf(i), (num, num2) -> {
            return Integer.valueOf(num2 == null ? 0 : num2.intValue() + 1);
        }).intValue(), 0);
    }

    private CheckpointedInputGate setupInputGate(int i, NetworkBufferPool networkBufferPool, ConnectionManager connectionManager) throws Exception {
        SingleInputGate build = new SingleInputGateBuilder().setBufferPoolFactory(networkBufferPool.createBufferPool(i, Integer.MAX_VALUE)).setSegmentProvider(networkBufferPool).setChannelFactory((inputChannelBuilder, singleInputGate) -> {
            return inputChannelBuilder.setConnectionManager(connectionManager).buildRemoteChannel(singleInputGate);
        }).setNumberOfChannels(i).build();
        build.setup();
        CheckpointedInputGate checkpointedInputGate = new CheckpointedInputGate(build, new CheckpointBarrierTracker(i, new AbstractInvokable(new DummyEnvironment()) { // from class: org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGateTest.2
            public void invoke() {
            }
        }, SystemClock.getInstance(), true), new MailboxExecutorImpl(new TaskMailboxImpl(), 0, StreamTaskActionExecutor.IMMEDIATE), UpstreamRecoveryTracker.forInputGate(build));
        for (int i2 = 0; i2 < i; i2++) {
            checkpointedInputGate.getChannel(i2).requestSubpartition();
        }
        return checkpointedInputGate;
    }

    private CheckpointedInputGate setupInputGateWithAlternatingController(int i, NetworkBufferPool networkBufferPool, AbstractInvokable abstractInvokable, RecordingChannelStateWriter recordingChannelStateWriter) throws Exception {
        TestingConnectionManager testingConnectionManager = new TestingConnectionManager();
        SingleInputGate build = new SingleInputGateBuilder().setBufferPoolFactory(networkBufferPool.createBufferPool(i, Integer.MAX_VALUE)).setSegmentProvider(networkBufferPool).setChannelFactory((inputChannelBuilder, singleInputGate) -> {
            return inputChannelBuilder.setConnectionManager(testingConnectionManager).buildRemoteChannel(singleInputGate);
        }).setNumberOfChannels(i).setChannelStateWriter(recordingChannelStateWriter).build();
        build.setup();
        CheckpointedInputGate checkpointedInputGate = new CheckpointedInputGate(build, TestBarrierHandlerFactory.forTarget(abstractInvokable).create(build, recordingChannelStateWriter), new MailboxExecutorImpl(new TaskMailboxImpl(), 0, StreamTaskActionExecutor.IMMEDIATE), UpstreamRecoveryTracker.forInputGate(build));
        for (int i2 = 0; i2 < i; i2++) {
            checkpointedInputGate.getChannel(i2).requestSubpartition();
        }
        return checkpointedInputGate;
    }
}
