package org.apache.flink.streaming.runtime.tasks;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.CompletingCheckpointResponder;
import org.apache.flink.util.FlinkRuntimeException;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.class */
public class StreamTaskFinalCheckpointsTest {
    private static final long CONCURRENT_EVENT_WAIT_PERIOD_MS = 500;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest$EmptyOperator.class */
    public static class EmptyOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        private EmptyOperator() {
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest$FinishingOperator.class */
    private static class FinishingOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        static boolean finished = false;

        private FinishingOperator() {
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
        }

        public void finish() throws Exception {
            finished = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest$ImmediatelyFinishingSource.class */
    public static class ImmediatelyFinishingSource implements SourceFunction<String> {
        private ImmediatelyFinishingSource() {
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
        }

        public void cancel() {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest$StatefulOperator.class */
    private static class StatefulOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        private ListState<Integer> state;

        private StatefulOperator() {
        }

        public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
            super.initializeState(stateInitializationContext);
            this.state = stateInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor("test", Integer.class));
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
        }
    }

    @Test
    public void testCheckpointDoneOnFinishedOperator() throws Exception {
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).setupOutputForSingletonOperatorChain(new FinishingOperator()).build();
        build.setAutoProcess(false);
        build.processElement(new StreamRecord(1));
        build.streamTask.operatorChain.finishOperators(build.streamTask.getActionExecutor(), StopMode.DRAIN);
        Assert.assertTrue(FinishingOperator.finished);
        build.getTaskStateManager().getWaitForReportLatch().reset();
        build.streamTask.triggerCheckpointOnBarrier(new CheckpointMetaData(2L, 0L), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder().setBytesProcessedDuringAlignment(0L).setAlignmentDurationNanos(0L));
        build.getTaskStateManager().getWaitForReportLatch().await();
        Assert.assertEquals(2L, build.getTaskStateManager().getReportedCheckpointId());
    }

    @Test
    public void testNotWaitingForAllRecordsProcessedIfCheckpointNotEnabled() throws Exception {
        ResultPartitionWriter[] resultPartitionWriterArr = new ResultPartitionWriter[2];
        for (int i = 0; i < resultPartitionWriterArr.length; i++) {
            try {
                resultPartitionWriterArr[i] = PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED_BOUNDED);
                resultPartitionWriterArr[i].setup();
            } finally {
                for (ResultPartitionWriter resultPartitionWriter : resultPartitionWriterArr) {
                    if (resultPartitionWriter != null) {
                        resultPartitionWriter.close();
                    }
                }
            }
        }
        StreamTaskMailboxTestHarness build = ((StreamTaskMailboxTestHarnessBuilder) new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyStreamConfig(streamConfig -> {
            streamConfig.setCheckpointingEnabled(false);
        }).addInput(BasicTypeInfo.STRING_TYPE_INFO).addAdditionalOutput(resultPartitionWriterArr).setupOperatorChain((StreamOperator<?>) new EmptyOperator()).finishForSingletonOperatorChain(StringSerializer.INSTANCE)).build();
        Throwable th = null;
        try {
            try {
                build.endInput();
                for (ResultPartitionWriter resultPartitionWriter2 : resultPartitionWriterArr) {
                    Assert.assertEquals(0L, ((PipelinedResultPartition) resultPartitionWriter2).getNumberOfQueuedBuffers());
                }
                if (build != null) {
                    if (0 != 0) {
                        try {
                            build.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        build.close();
                    }
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testWaitingForFinalCheckpoint() throws Exception {
        ResultPartition[] resultPartitionArr = new ResultPartition[2];
        for (int i = 0; i < resultPartitionArr.length; i++) {
            try {
                resultPartitionArr[i] = PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED_BOUNDED);
                resultPartitionArr[i].setup();
            } finally {
                for (ResultPartition resultPartition : resultPartitionArr) {
                    if (resultPartition != null) {
                        resultPartition.close();
                    }
                }
            }
        }
        StreamTaskMailboxTestHarness<String> createTestHarness = createTestHarness(resultPartitionArr, new CompletingCheckpointResponder(), false);
        Throwable th = null;
        try {
            try {
                processMailTillCheckpointSucceeds(createTestHarness, triggerCheckpoint(createTestHarness, 2L));
                Assert.assertEquals(2L, createTestHarness.getTaskStateManager().getReportedCheckpointId());
                createTestHarness.processEvent(new EndOfData(StopMode.DRAIN), 0, 0);
                createTestHarness.processEvent(EndOfPartitionEvent.INSTANCE, 0, 0);
                processMailTillCheckpointSucceeds(createTestHarness, triggerCheckpoint(createTestHarness, 4L));
                Assert.assertEquals(4L, createTestHarness.getTaskStateManager().getReportedCheckpointId());
                createTestHarness.processEvent(new EndOfData(StopMode.DRAIN), 0, 1);
                createTestHarness.processEvent(new EndOfData(StopMode.DRAIN), 0, 2);
                createTestHarness.processEvent(EndOfPartitionEvent.INSTANCE, 0, 1);
                createTestHarness.processEvent(EndOfPartitionEvent.INSTANCE, 0, 2);
                CompletableFuture<Boolean> triggerCheckpoint = triggerCheckpoint(createTestHarness, 6);
                triggerCheckpoint.thenAccept(bool -> {
                    for (ResultPartition resultPartition2 : resultPartitionArr) {
                        resultPartition2.onSubpartitionAllDataProcessed(0);
                    }
                });
                createTestHarness.finishProcessing();
                Assert.assertTrue(triggerCheckpoint.isDone());
                createTestHarness.getTaskStateManager().getWaitForReportLatch().await();
                Assert.assertEquals(6L, createTestHarness.getTaskStateManager().getReportedCheckpointId());
                Assert.assertEquals(6L, createTestHarness.getTaskStateManager().getNotifiedCompletedCheckpointId());
                for (ResultPartition resultPartition2 : resultPartitionArr) {
                    Assert.assertEquals(4L, resultPartition2.getNumberOfQueuedBuffers());
                }
                if (createTestHarness != null) {
                    if (0 != 0) {
                        try {
                            createTestHarness.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createTestHarness.close();
                    }
                }
            } finally {
            }
        } finally {
        }
    }

    private StreamTaskMailboxTestHarness<String> createTestHarness(CompletingCheckpointResponder completingCheckpointResponder) throws Exception {
        return createTestHarness(null, completingCheckpointResponder, false);
    }

    private StreamTaskMailboxTestHarness<String> createTestHarness(@Nullable ResultPartition[] resultPartitionArr, CompletingCheckpointResponder completingCheckpointResponder, boolean z) throws Exception {
        StreamTaskMailboxTestHarnessBuilder streamTaskMailboxTestHarnessBuilder = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
        if (resultPartitionArr != null) {
            streamTaskMailboxTestHarnessBuilder = streamTaskMailboxTestHarnessBuilder.addAdditionalOutput(resultPartitionArr);
        }
        StreamTaskMailboxTestHarness<String> build = ((StreamTaskMailboxTestHarnessBuilder) streamTaskMailboxTestHarnessBuilder.addInput(BasicTypeInfo.STRING_TYPE_INFO, 3).modifyStreamConfig(streamConfig -> {
            streamConfig.setCheckpointingEnabled(true);
            streamConfig.setUnalignedCheckpointsEnabled(z);
        }).setCheckpointResponder(completingCheckpointResponder).setupOperatorChain((StreamOperator<?>) new EmptyOperator()).finishForSingletonOperatorChain(StringSerializer.INSTANCE)).build();
        StreamTask<String, ?> streamTask = build.streamTask;
        streamTask.getClass();
        Consumer<Long> consumer = (v1) -> {
            r1.notifyCheckpointCompleteAsync(v1);
        };
        StreamTask<String, ?> streamTask2 = build.streamTask;
        streamTask2.getClass();
        completingCheckpointResponder.setHandlers(consumer, (v1, v2) -> {
            r2.notifyCheckpointAbortAsync(v1, v2);
        });
        return build;
    }

    @Test
    public void testWaitingForFinalCheckpointNotTheFirsNotifiedComplete() throws Exception {
        ResultPartition[] resultPartitionArr = new ResultPartition[2];
        for (int i = 0; i < resultPartitionArr.length; i++) {
            try {
                resultPartitionArr[i] = PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED_BOUNDED);
                resultPartitionArr[i].setup();
            } finally {
                for (ResultPartition resultPartition : resultPartitionArr) {
                    if (resultPartition != null) {
                        resultPartition.close();
                    }
                }
            }
        }
        CompletingCheckpointResponder completingCheckpointResponder = new CompletingCheckpointResponder();
        StreamTaskMailboxTestHarness<String> createTestHarness = createTestHarness(resultPartitionArr, completingCheckpointResponder, false);
        Throwable th = null;
        try {
            try {
                completingCheckpointResponder.completeCheckpoints(Collections.singletonList(3L));
                createTestHarness.waitForTaskCompletion();
                triggerCheckpoint(createTestHarness, 1L).thenAccept(bool -> {
                    for (ResultPartition resultPartition2 : resultPartitionArr) {
                        resultPartition2.onSubpartitionAllDataProcessed(0);
                    }
                });
                createTestHarness.processAll();
                createTestHarness.getTaskStateManager().getWaitForReportLatch().await();
                triggerCheckpoint(createTestHarness, 2L);
                createTestHarness.processAll();
                createTestHarness.getTaskStateManager().getWaitForReportLatch().await();
                triggerCheckpoint(createTestHarness, 3L);
                createTestHarness.processAll();
                createTestHarness.finishProcessing();
                createTestHarness.getTaskStateManager().getWaitForReportLatch().await();
                Assert.assertEquals(3L, createTestHarness.getTaskStateManager().getReportedCheckpointId());
                Assert.assertEquals(3L, createTestHarness.getTaskStateManager().getNotifiedCompletedCheckpointId());
                for (ResultPartition resultPartition2 : resultPartitionArr) {
                    Assert.assertEquals(4L, resultPartition2.getNumberOfQueuedBuffers());
                }
                if (createTestHarness != null) {
                    if (0 != 0) {
                        try {
                            createTestHarness.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createTestHarness.close();
                    }
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testTriggerStopWithSavepointWhenWaitingForFinalCheckpoint() throws Exception {
        ResultPartition[] resultPartitionArr = new ResultPartition[2];
        for (int i = 0; i < resultPartitionArr.length; i++) {
            try {
                resultPartitionArr[i] = PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED_BOUNDED);
                resultPartitionArr[i].setup();
            } finally {
                for (ResultPartition resultPartition : resultPartitionArr) {
                    if (resultPartition != null) {
                        resultPartition.close();
                    }
                }
            }
        }
        final int i2 = 6;
        final int i3 = 7;
        StreamTaskMailboxTestHarness<String> createTestHarness = createTestHarness(resultPartitionArr, new CompletingCheckpointResponder() { // from class: org.apache.flink.streaming.runtime.tasks.StreamTaskFinalCheckpointsTest.1
            @Override // org.apache.flink.streaming.util.CompletingCheckpointResponder
            public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long j, CheckpointMetrics checkpointMetrics, TaskStateSnapshot taskStateSnapshot) {
                if (i3 == j) {
                    super.acknowledgeCheckpoint(jobID, executionAttemptID, i2, checkpointMetrics, taskStateSnapshot);
                    try {
                        Thread.sleep(StreamTaskFinalCheckpointsTest.CONCURRENT_EVENT_WAIT_PERIOD_MS);
                        super.acknowledgeCheckpoint(jobID, executionAttemptID, i3, checkpointMetrics, taskStateSnapshot);
                    } catch (InterruptedException e) {
                        throw new FlinkRuntimeException(e);
                    }
                }
            }
        }, false);
        Throwable th = null;
        try {
            try {
                createTestHarness.waitForTaskCompletion();
                CompletableFuture<Boolean> triggerCheckpoint = triggerCheckpoint(createTestHarness, 6);
                triggerCheckpoint.thenAccept(bool -> {
                    for (ResultPartition resultPartition2 : resultPartitionArr) {
                        resultPartition2.onSubpartitionAllDataProcessed(0);
                    }
                });
                CompletableFuture<Boolean> triggerStopWithSavepointDrain = triggerStopWithSavepointDrain(createTestHarness, 7);
                createTestHarness.finishProcessing();
                Assert.assertTrue(triggerCheckpoint.isDone());
                Assert.assertTrue(triggerStopWithSavepointDrain.isDone());
                createTestHarness.getTaskStateManager().getWaitForReportLatch().await();
                Assert.assertEquals(7, createTestHarness.getTaskStateManager().getReportedCheckpointId());
                Assert.assertEquals(7, createTestHarness.getTaskStateManager().getNotifiedCompletedCheckpointId());
                for (ResultPartition resultPartition2 : resultPartitionArr) {
                    Assert.assertEquals(3L, resultPartition2.getNumberOfQueuedBuffers());
                }
                if (createTestHarness != null) {
                    if (0 != 0) {
                        try {
                            createTestHarness.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createTestHarness.close();
                    }
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testTriggerStopWithSavepointWhenWaitingForFinalCheckpointOnSourceTask() throws Exception {
        doTestTriggerStopWithSavepointWhenWaitingForFinalCheckpointOnSourceTask(true);
    }

    @Test
    public void testTriggerStopWithSavepointNoDrainWhenWaitingForFinalCheckpointOnSourceTask() throws Exception {
        doTestTriggerStopWithSavepointWhenWaitingForFinalCheckpointOnSourceTask(false);
    }

    private void doTestTriggerStopWithSavepointWhenWaitingForFinalCheckpointOnSourceTask(boolean z) throws Exception {
        final int i = 6;
        final int i2 = 7;
        CompletingCheckpointResponder completingCheckpointResponder = new CompletingCheckpointResponder() { // from class: org.apache.flink.streaming.runtime.tasks.StreamTaskFinalCheckpointsTest.2
            @Override // org.apache.flink.streaming.util.CompletingCheckpointResponder
            public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long j, CheckpointMetrics checkpointMetrics, TaskStateSnapshot taskStateSnapshot) {
                if (i2 == j) {
                    super.acknowledgeCheckpoint(jobID, executionAttemptID, i, checkpointMetrics, taskStateSnapshot);
                    try {
                        Thread.sleep(StreamTaskFinalCheckpointsTest.CONCURRENT_EVENT_WAIT_PERIOD_MS);
                        super.acknowledgeCheckpoint(jobID, executionAttemptID, i2, checkpointMetrics, taskStateSnapshot);
                    } catch (InterruptedException e) {
                        throw new FlinkRuntimeException(e);
                    }
                }
            }
        };
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyStreamConfig(streamConfig -> {
            streamConfig.setCheckpointingEnabled(true);
        }).setCheckpointResponder(completingCheckpointResponder).setupOutputForSingletonOperatorChain((StreamOperator<?>) new StreamSource(new ImmediatelyFinishingSource())).build();
        Throwable th = null;
        try {
            try {
                StreamTask<OUT, ?> streamTask = build.streamTask;
                streamTask.getClass();
                Consumer<Long> consumer = (v1) -> {
                    r1.notifyCheckpointCompleteAsync(v1);
                };
                StreamTask<OUT, ?> streamTask2 = build.streamTask;
                streamTask2.getClass();
                completingCheckpointResponder.setHandlers(consumer, (v1, v2) -> {
                    r2.notifyCheckpointAbortAsync(v1, v2);
                });
                build.streamTask.runMailboxLoop();
                CompletableFuture<Boolean> triggerCheckpoint = triggerCheckpoint(build, 6);
                CompletableFuture<Boolean> triggerStopWithSavepointDrain = z ? triggerStopWithSavepointDrain(build, 7) : triggerStopWithSavepointNoDrain(build, 7);
                build.finishProcessing();
                Assert.assertTrue(triggerCheckpoint.isDone());
                Assert.assertTrue(triggerStopWithSavepointDrain.isDone());
                build.getTaskStateManager().getWaitForReportLatch().await();
                Assert.assertEquals(7, build.getTaskStateManager().getReportedCheckpointId());
                Assert.assertEquals(7, build.getTaskStateManager().getNotifiedCompletedCheckpointId());
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTriggeringAlignedNoTimeoutCheckpointWithFinishedChannels() throws Exception {
        testTriggeringCheckpointWithFinishedChannels(CheckpointOptions.alignedNoTimeout(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()));
    }

    @Test
    public void testTriggeringUnalignedCheckpointWithFinishedChannels() throws Exception {
        testTriggeringCheckpointWithFinishedChannels(CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()));
    }

    @Test
    public void testTriggeringAlignedWithTimeoutCheckpointWithFinishedChannels() throws Exception {
        testTriggeringCheckpointWithFinishedChannels(CheckpointOptions.alignedWithTimeout(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault(), 10L));
    }

    private void testTriggeringCheckpointWithFinishedChannels(CheckpointOptions checkpointOptions) throws Exception {
        ResultPartition[] resultPartitionArr = new ResultPartition[2];
        for (int i = 0; i < resultPartitionArr.length; i++) {
            try {
                resultPartitionArr[i] = PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED_BOUNDED);
                resultPartitionArr[i].setup();
            } finally {
                for (ResultPartition resultPartition : resultPartitionArr) {
                    if (resultPartition != null) {
                        resultPartition.close();
                    }
                }
            }
        }
        StreamTaskMailboxTestHarness<String> createTestHarness = createTestHarness(resultPartitionArr, new CompletingCheckpointResponder(), checkpointOptions.isUnalignedCheckpoint() || checkpointOptions.isTimeoutable());
        Throwable th = null;
        try {
            try {
                int numberOfInputChannels = createTestHarness.inputGates[0].getInputGate().getNumberOfInputChannels();
                int[] iArr = new int[numberOfInputChannels];
                for (int i2 = 0; i2 < numberOfInputChannels; i2++) {
                    TestInputChannel testInputChannel = (TestInputChannel) createTestHarness.inputGates[0].getInputGate().getChannel(i2);
                    testInputChannel.setActionOnResumed(() -> {
                        int channelIndex = testInputChannel.getChannelIndex();
                        iArr[channelIndex] = iArr[channelIndex] + 1;
                    });
                }
                processMailTillCheckpointSucceeds(createTestHarness, triggerCheckpoint(createTestHarness, 2L, checkpointOptions));
                Assert.assertEquals(2L, createTestHarness.getTaskStateManager().getReportedCheckpointId());
                Assert.assertArrayEquals(new int[]{0, 0, 0}, iArr);
                createTestHarness.processEvent(new EndOfData(StopMode.DRAIN), 0, 0);
                createTestHarness.processEvent(EndOfPartitionEvent.INSTANCE, 0, 0);
                processMailTillCheckpointSucceeds(createTestHarness, triggerCheckpoint(createTestHarness, 4L, checkpointOptions));
                Assert.assertEquals(4L, createTestHarness.getTaskStateManager().getReportedCheckpointId());
                Assert.assertArrayEquals(new int[]{0, 0, 0}, iArr);
                createTestHarness.processEvent(new EndOfData(StopMode.DRAIN), 0, 1);
                createTestHarness.processEvent(new EndOfData(StopMode.DRAIN), 0, 2);
                createTestHarness.processEvent(EndOfPartitionEvent.INSTANCE, 0, 1);
                createTestHarness.processEvent(EndOfPartitionEvent.INSTANCE, 0, 2);
                CompletableFuture<Boolean> triggerCheckpoint = triggerCheckpoint(createTestHarness, 6L, checkpointOptions);
                triggerCheckpoint.thenAccept(bool -> {
                    for (ResultPartition resultPartition2 : resultPartitionArr) {
                        resultPartition2.onSubpartitionAllDataProcessed(0);
                    }
                });
                createTestHarness.finishProcessing();
                Assert.assertTrue(triggerCheckpoint.isDone());
                createTestHarness.getTaskStateManager().getWaitForReportLatch().await();
                Assert.assertEquals(6L, createTestHarness.getTaskStateManager().getReportedCheckpointId());
                Assert.assertArrayEquals(new int[]{0, 0, 0}, iArr);
                for (ResultPartition resultPartition2 : resultPartitionArr) {
                    Assert.assertEquals(4L, resultPartition2.getNumberOfQueuedBuffers());
                }
                if (createTestHarness != null) {
                    if (0 != 0) {
                        try {
                            createTestHarness.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createTestHarness.close();
                    }
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testReportOperatorsFinishedInCheckpoint() throws Exception {
        ResultPartition[] resultPartitionArr = new ResultPartition[2];
        for (int i = 0; i < resultPartitionArr.length; i++) {
            try {
                resultPartitionArr[i] = PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED_BOUNDED);
                resultPartitionArr[i].setup();
            } finally {
                for (ResultPartition resultPartition : resultPartitionArr) {
                    if (resultPartition != null) {
                        resultPartition.close();
                    }
                }
            }
        }
        CompletingCheckpointResponder completingCheckpointResponder = new CompletingCheckpointResponder();
        StreamTaskMailboxTestHarness build = ((StreamTaskMailboxTestHarnessBuilder) new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO, 1).addAdditionalOutput(resultPartitionArr).setCheckpointResponder(completingCheckpointResponder).modifyStreamConfig(streamConfig -> {
            streamConfig.setCheckpointingEnabled(true);
        }).setupOperatorChain((StreamOperator<?>) new StatefulOperator()).finishForSingletonOperatorChain(StringSerializer.INSTANCE)).build();
        Throwable th = null;
        try {
            try {
                StreamTask<OUT, ?> streamTask = build.streamTask;
                streamTask.getClass();
                Consumer<Long> consumer = (v1) -> {
                    r1.notifyCheckpointCompleteAsync(v1);
                };
                StreamTask<OUT, ?> streamTask2 = build.streamTask;
                streamTask2.getClass();
                completingCheckpointResponder.setHandlers(consumer, (v1, v2) -> {
                    r2.notifyCheckpointAbortAsync(v1, v2);
                });
                processMailTillCheckpointSucceeds(build, triggerCheckpoint(build, 2L));
                Assert.assertEquals(2L, build.getTaskStateManager().getReportedCheckpointId());
                Assert.assertFalse(build.getTaskStateManager().getJobManagerTaskStateSnapshotsByCheckpointId().get(2L).isTaskFinished());
                build.processEvent(new EndOfData(StopMode.DRAIN), 0, 0);
                CompletableFuture<Boolean> triggerCheckpoint = triggerCheckpoint(build, 4L);
                triggerCheckpoint.thenAccept(bool -> {
                    for (ResultPartition resultPartition2 : resultPartitionArr) {
                        resultPartition2.onSubpartitionAllDataProcessed(0);
                    }
                });
                build.processAll();
                build.finishProcessing();
                Assert.assertTrue(triggerCheckpoint.isDone());
                build.getTaskStateManager().getWaitForReportLatch().await();
                Assert.assertTrue(build.getTaskStateManager().getJobManagerTaskStateSnapshotsByCheckpointId().get(4L).isTaskFinished());
                if (build != null) {
                    if (0 != 0) {
                        try {
                            build.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        build.close();
                    }
                }
            } finally {
            }
        } finally {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static CompletableFuture<Boolean> triggerCheckpoint(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness, long j) {
        return triggerCheckpoint(streamTaskMailboxTestHarness, j, CheckpointOptions.forCheckpointWithDefaultLocation());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static CompletableFuture<Boolean> triggerCheckpoint(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness, long j, CheckpointOptions checkpointOptions) {
        streamTaskMailboxTestHarness.getTaskStateManager().getWaitForReportLatch().reset();
        return streamTaskMailboxTestHarness.getStreamTask().triggerCheckpointAsync(new CheckpointMetaData(j, j * 1000), checkpointOptions);
    }

    static CompletableFuture<Boolean> triggerStopWithSavepointDrain(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness, long j) {
        return triggerStopWithSavepoint(streamTaskMailboxTestHarness, j, SavepointType.terminate(SavepointFormatType.CANONICAL));
    }

    static CompletableFuture<Boolean> triggerStopWithSavepointNoDrain(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness, long j) {
        return triggerStopWithSavepoint(streamTaskMailboxTestHarness, j, SavepointType.suspend(SavepointFormatType.CANONICAL));
    }

    static CompletableFuture<Boolean> triggerStopWithSavepoint(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness, long j, SavepointType savepointType) {
        streamTaskMailboxTestHarness.getTaskStateManager().getWaitForReportLatch().reset();
        return streamTaskMailboxTestHarness.getStreamTask().triggerCheckpointAsync(new CheckpointMetaData(j, j * 1000), CheckpointOptions.alignedNoTimeout(savepointType, CheckpointStorageLocationReference.getDefault()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void processMailTillCheckpointSucceeds(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness, Future<Boolean> future) throws Exception {
        while (!future.isDone()) {
            streamTaskMailboxTestHarness.processSingleStep();
        }
        streamTaskMailboxTestHarness.getTaskStateManager().getWaitForReportLatch().await();
    }

    @Test
    public void testWaitingForPendingCheckpointsOnFinished() throws Exception {
        final long j = 2;
        StreamTaskMailboxTestHarness<String> createTestHarness = createTestHarness(new CompletingCheckpointResponder() { // from class: org.apache.flink.streaming.runtime.tasks.StreamTaskFinalCheckpointsTest.3
            @Override // org.apache.flink.streaming.util.CompletingCheckpointResponder
            public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long j2, CheckpointMetrics checkpointMetrics, TaskStateSnapshot taskStateSnapshot) {
                if (j != j2) {
                    super.acknowledgeCheckpoint(jobID, executionAttemptID, j2, checkpointMetrics, taskStateSnapshot);
                    return;
                }
                try {
                    Thread.sleep(StreamTaskFinalCheckpointsTest.CONCURRENT_EVENT_WAIT_PERIOD_MS);
                } catch (InterruptedException e) {
                    throw new FlinkRuntimeException(e);
                }
            }
        });
        Throwable th = null;
        try {
            try {
                createTestHarness.waitForTaskCompletion();
                createTestHarness.streamTask.triggerCheckpointOnBarrier(new CheckpointMetaData(1L, 101L), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder().setBytesProcessedDuringAlignment(0L).setAlignmentDurationNanos(0L));
                createTestHarness.streamTask.triggerCheckpointOnBarrier(new CheckpointMetaData(2L, 101L), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder().setBytesProcessedDuringAlignment(0L).setAlignmentDurationNanos(0L));
                createTestHarness.processAll();
                createTestHarness.finishProcessing();
                Assert.assertEquals(2L, createTestHarness.getTaskStateManager().getReportedCheckpointId());
                if (createTestHarness != null) {
                    if (0 == 0) {
                        createTestHarness.close();
                        return;
                    }
                    try {
                        createTestHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTestHarness != null) {
                if (th != null) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testOperatorSkipLifeCycleIfFinishedOnRestore() throws Exception {
        StreamTaskMailboxTestHarness build = ((StreamTaskMailboxTestHarnessBuilder) new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO, 3).setCollectNetworkEvents().setTaskStateSnapshot(1L, TaskStateSnapshot.FINISHED_ON_RESTORE).setupOperatorChain((StreamOperator<?>) new TestFinishedOnRestoreStreamOperator()).chain((OneInputStreamOperator) new TestFinishedOnRestoreStreamOperator(), (TypeSerializer) StringSerializer.INSTANCE).finish()).build();
        Throwable th = null;
        try {
            build.processAll();
            build.getTaskStateManager().getWaitForReportLatch().reset();
            CheckpointMetaData checkpointMetaData = new CheckpointMetaData(2L, 2L);
            CheckpointOptions checkpointOptions = new CheckpointOptions(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault());
            build.streamTask.triggerCheckpointOnBarrier(checkpointMetaData, checkpointOptions, new CheckpointMetricsBuilder().setBytesProcessedDuringAlignment(0L).setAlignmentDurationNanos(0L));
            build.getTaskStateManager().getWaitForReportLatch().await();
            Assert.assertEquals(2L, build.getTaskStateManager().getReportedCheckpointId());
            build.streamTask.notifyCheckpointCompleteAsync(2L);
            build.streamTask.notifyCheckpointAbortAsync(3L, 2L);
            build.processAll();
            build.processElement(Watermark.MAX_WATERMARK, 0, 0);
            build.processElement(Watermark.MAX_WATERMARK, 0, 1);
            build.processElement(Watermark.MAX_WATERMARK, 0, 2);
            build.waitForTaskCompletion();
            build.finishProcessing();
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(new Object[]{new CheckpointBarrier(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions), Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)}));
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testWaitingForUnalignedChannelStatesIfFinishedOnRestore() throws Exception {
        OperatorID operatorID = new OperatorID();
        StreamTaskMailboxTestHarness build = ((StreamTaskMailboxTestHarnessBuilder) new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyStreamConfig(streamConfig -> {
            streamConfig.setUnalignedCheckpointsEnabled(true);
        }).addInput(BasicTypeInfo.STRING_TYPE_INFO, 3).setCollectNetworkEvents().setTaskStateSnapshot(1L, TaskStateSnapshot.FINISHED_ON_RESTORE).setupOperatorChain((StreamOperator<?>) new TestFinishedOnRestoreStreamOperator()).chain(operatorID, (OneInputStreamOperator) new TestFinishedOnRestoreStreamOperator(operatorID), (TypeSerializer) StringSerializer.INSTANCE).finish()).build();
        Throwable th = null;
        try {
            try {
                build.processAll();
                TestCheckpointResponder checkpointResponder = build.getCheckpointResponder();
                checkpointResponder.setAcknowledgeLatch(new OneShotLatch());
                checkpointResponder.setDeclinedLatch(new OneShotLatch());
                CheckpointBarrier checkpointBarrier = new CheckpointBarrier(2L, 2L, CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()));
                build.processEvent(checkpointBarrier, 0, 0);
                Thread.sleep(CONCURRENT_EVENT_WAIT_PERIOD_MS);
                build.processEvent(checkpointBarrier, 0, 1);
                build.processEvent(checkpointBarrier, 0, 2);
                CommonTestUtils.waitUntilCondition(() -> {
                    return Boolean.valueOf(checkpointResponder.getAcknowledgeLatch().isTriggered() || checkpointResponder.getDeclinedLatch().isTriggered());
                });
                Assert.assertEquals(Collections.singletonList(2L), checkpointResponder.getAcknowledgeReports().stream().map((v0) -> {
                    return v0.getCheckpointId();
                }).collect(Collectors.toList()));
                Assert.assertEquals(Collections.emptyList(), checkpointResponder.getDeclineReports().stream().map((v0) -> {
                    return v0.getCheckpointId();
                }).collect(Collectors.toList()));
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }
}
