/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.BlockingQueue;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionWithException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

public class SourceTaskTerminationTest
extends TestLogger {
    private static OneShotLatch ready;
    private static MultiShotLatch runLoopStart;
    private static MultiShotLatch runLoopEnd;
    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds((long)20L);

    @Before
    public void initialize() {
        ready = new OneShotLatch();
        runLoopStart = new MultiShotLatch();
        runLoopEnd = new MultiShotLatch();
    }

    @Test
    public void testStopWithSavepointWithMaxWatermark() throws Exception {
        this.stopWithSavepointStreamTaskTestHelper(true);
    }

    @Test
    public void testStopWithSavepointWithoutMaxWatermark() throws Exception {
        this.stopWithSavepointStreamTaskTestHelper(false);
    }

    private void stopWithSavepointStreamTaskTestHelper(boolean withMaxWatermark) throws Exception {
        long syncSavepointId = 34L;
        StreamTaskTestHarness<Long> srcTaskTestHarness = this.getSourceStreamTaskTestHarness();
        Thread executionThread = srcTaskTestHarness.invoke();
        StreamTask<Long, ?> srcTask = srcTaskTestHarness.getTask();
        ready.await();
        this.emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 1L);
        this.emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 2L);
        srcTask.triggerCheckpointAsync(new CheckpointMetaData(31L, 900L), CheckpointOptions.forCheckpointWithDefaultLocation(), false).get();
        this.verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), 31L);
        this.emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 3L);
        srcTask.triggerCheckpointAsync(new CheckpointMetaData(34L, 900L), new CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, CheckpointStorageLocationReference.getDefault()), withMaxWatermark).get();
        if (withMaxWatermark) {
            this.verifyWatermark(srcTaskTestHarness.getOutput(), Watermark.MAX_WATERMARK);
        }
        this.verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), 34L);
        this.waitForSynchronousSavepointIdToBeSet(srcTask);
        Assert.assertTrue((boolean)srcTask.getSynchronousSavepointId().isPresent());
        srcTask.notifyCheckpointCompleteAsync(34L).get();
        Assert.assertFalse((boolean)srcTask.getSynchronousSavepointId().isPresent());
        executionThread.join();
    }

    private StreamTaskTestHarness<Long> getSourceStreamTaskTestHarness() {
        StreamTaskTestHarness<Long> testHarness = new StreamTaskTestHarness<Long>((FunctionWithException<Environment, StreamTask<Long, ?>, Exception>)((FunctionWithException)SourceStreamTask::new), (TypeInformation<Long>)BasicTypeInfo.LONG_TYPE_INFO);
        LockStepSourceWithOneWmPerElement source = new LockStepSourceWithOneWmPerElement();
        testHarness.setupOutputForSingletonOperatorChain();
        testHarness.getExecutionConfig().setLatencyTrackingInterval(-1L);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamSource sourceOperator = new StreamSource((SourceFunction)source);
        streamConfig.setStreamOperator((StreamOperator)sourceOperator);
        streamConfig.setOperatorID(new OperatorID());
        return testHarness;
    }

    private void waitForSynchronousSavepointIdToBeSet(StreamTask streamTaskUnderTest) throws InterruptedException {
        while (!streamTaskUnderTest.getSynchronousSavepointId().isPresent()) {
            Thread.sleep(10L);
        }
    }

    private void emitAndVerifyWatermarkAndElement(StreamTaskTestHarness<Long> srcTaskTestHarness, long expectedElement) throws InterruptedException {
        runLoopStart.trigger();
        this.verifyWatermark(srcTaskTestHarness.getOutput(), new Watermark(expectedElement));
        this.verifyNextElement(srcTaskTestHarness.getOutput(), expectedElement);
        runLoopEnd.await();
    }

    private void verifyNextElement(BlockingQueue<Object> output, long expectedElement) throws InterruptedException {
        Object next = output.take();
        Assert.assertTrue((String)"next element is not an event", (boolean)(next instanceof StreamRecord));
        Assert.assertEquals((String)"wrong event", (long)expectedElement, (long)((Long)((StreamRecord)next).getValue()));
    }

    private void verifyWatermark(BlockingQueue<Object> output, Watermark expectedWatermark) throws InterruptedException {
        Object next = output.take();
        Assert.assertTrue((String)"next element is not a watermark", (boolean)(next instanceof Watermark));
        Assert.assertEquals((String)"wrong watermark", (Object)expectedWatermark, (Object)next);
    }

    private void verifyCheckpointBarrier(BlockingQueue<Object> output, long checkpointId) throws InterruptedException {
        Object next = output.take();
        Assert.assertTrue((String)"next element is not a checkpoint barrier", (boolean)(next instanceof CheckpointBarrier));
        Assert.assertEquals((String)"wrong checkpoint id", (long)checkpointId, (long)((CheckpointBarrier)next).getId());
    }

    private static class LockStepSourceWithOneWmPerElement
    implements SourceFunction<Long> {
        private volatile boolean isRunning;

        private LockStepSourceWithOneWmPerElement() {
        }

        public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
            long element = 1L;
            this.isRunning = true;
            ready.trigger();
            while (this.isRunning) {
                runLoopStart.await();
                ctx.emitWatermark(new Watermark(element));
                ctx.collect((Object)element++);
                runLoopEnd.trigger();
            }
        }

        public void cancel() {
            this.isRunning = false;
            runLoopStart.trigger();
        }
    }
}

