package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.BlockingQueue;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.class */
public class SourceTaskTerminationTest extends TestLogger {
    private static OneShotLatch ready;
    private static MultiShotLatch runLoopStart;
    private static MultiShotLatch runLoopEnd;

    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds(20);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest$LockStepSourceWithOneWmPerElement.class */
    public static class LockStepSourceWithOneWmPerElement implements SourceFunction<Long> {
        private volatile boolean isRunning;

        private LockStepSourceWithOneWmPerElement() {
        }

        public void run(SourceFunction.SourceContext<Long> sourceContext) throws Exception {
            long j = 1;
            this.isRunning = true;
            SourceTaskTerminationTest.ready.trigger();
            while (this.isRunning) {
                SourceTaskTerminationTest.runLoopStart.await();
                sourceContext.emitWatermark(new Watermark(j));
                long j2 = j;
                j = j2 + 1;
                sourceContext.collect(Long.valueOf(j2));
                SourceTaskTerminationTest.runLoopEnd.trigger();
            }
        }

        public void cancel() {
            this.isRunning = false;
            SourceTaskTerminationTest.runLoopStart.trigger();
        }
    }

    @Before
    public void initialize() {
        ready = new OneShotLatch();
        runLoopStart = new MultiShotLatch();
        runLoopEnd = new MultiShotLatch();
    }

    @Test
    public void testStopWithSavepointWithMaxWatermark() throws Exception {
        stopWithSavepointStreamTaskTestHelper(true);
    }

    @Test
    public void testStopWithSavepointWithoutMaxWatermark() throws Exception {
        stopWithSavepointStreamTaskTestHelper(false);
    }

    private void stopWithSavepointStreamTaskTestHelper(boolean z) throws Exception {
        StreamTaskTestHarness<Long> sourceStreamTaskTestHarness = getSourceStreamTaskTestHarness();
        Thread invoke = sourceStreamTaskTestHarness.invoke();
        StreamTask<Long, ?> mo102getTask = sourceStreamTaskTestHarness.mo102getTask();
        ready.await();
        emitAndVerifyWatermarkAndElement(sourceStreamTaskTestHarness, 1L);
        emitAndVerifyWatermarkAndElement(sourceStreamTaskTestHarness, 2L);
        mo102getTask.triggerCheckpointAsync(new CheckpointMetaData(31L, 900L), CheckpointOptions.forCheckpointWithDefaultLocation(), false).get();
        verifyCheckpointBarrier(sourceStreamTaskTestHarness.getOutput(), 31L);
        emitAndVerifyWatermarkAndElement(sourceStreamTaskTestHarness, 3L);
        mo102getTask.triggerCheckpointAsync(new CheckpointMetaData(34L, 900L), new CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, CheckpointStorageLocationReference.getDefault()), z).get();
        if (z) {
            verifyWatermark(sourceStreamTaskTestHarness.getOutput(), Watermark.MAX_WATERMARK);
        }
        verifyCheckpointBarrier(sourceStreamTaskTestHarness.getOutput(), 34L);
        waitForSynchronousSavepointIdToBeSet(mo102getTask);
        Assert.assertTrue(mo102getTask.getSynchronousSavepointId().isPresent());
        mo102getTask.notifyCheckpointCompleteAsync(34L).get();
        Assert.assertFalse(mo102getTask.getSynchronousSavepointId().isPresent());
        invoke.join();
    }

    private StreamTaskTestHarness<Long> getSourceStreamTaskTestHarness() {
        StreamTaskTestHarness<Long> streamTaskTestHarness = new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.LONG_TYPE_INFO);
        LockStepSourceWithOneWmPerElement lockStepSourceWithOneWmPerElement = new LockStepSourceWithOneWmPerElement();
        streamTaskTestHarness.setupOutputForSingletonOperatorChain();
        streamTaskTestHarness.getExecutionConfig().setLatencyTrackingInterval(-1L);
        StreamConfig streamConfig = streamTaskTestHarness.getStreamConfig();
        streamConfig.setStreamOperator(new StreamSource(lockStepSourceWithOneWmPerElement));
        streamConfig.setOperatorID(new OperatorID());
        return streamTaskTestHarness;
    }

    private void waitForSynchronousSavepointIdToBeSet(StreamTask streamTask) throws InterruptedException {
        while (!streamTask.getSynchronousSavepointId().isPresent()) {
            Thread.sleep(10L);
        }
    }

    private void emitAndVerifyWatermarkAndElement(StreamTaskTestHarness<Long> streamTaskTestHarness, long j) throws InterruptedException {
        runLoopStart.trigger();
        verifyWatermark(streamTaskTestHarness.getOutput(), new Watermark(j));
        verifyNextElement(streamTaskTestHarness.getOutput(), j);
        runLoopEnd.await();
    }

    private void verifyNextElement(BlockingQueue<Object> blockingQueue, long j) throws InterruptedException {
        Object take = blockingQueue.take();
        Assert.assertTrue("next element is not an event", take instanceof StreamRecord);
        Assert.assertEquals("wrong event", j, ((Long) ((StreamRecord) take).getValue()).longValue());
    }

    private void verifyWatermark(BlockingQueue<Object> blockingQueue, Watermark watermark) throws InterruptedException {
        Object take = blockingQueue.take();
        Assert.assertTrue("next element is not a watermark", take instanceof Watermark);
        Assert.assertEquals("wrong watermark", watermark, take);
    }

    private void verifyCheckpointBarrier(BlockingQueue<Object> blockingQueue, long j) throws InterruptedException {
        Object take = blockingQueue.take();
        Assert.assertTrue("next element is not a checkpoint barrier", take instanceof CheckpointBarrier);
        Assert.assertEquals("wrong checkpoint id", j, ((CheckpointBarrier) take).getId());
    }
}
