/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.BlockingQueue;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SourceExternalCheckpointTriggerTest {
    private static OneShotLatch ready = new OneShotLatch();
    private static MultiShotLatch sync = new MultiShotLatch();

    @Before
    public void resetLatches() {
        ready = new OneShotLatch();
        sync = new MultiShotLatch();
    }

    @Test
    public void testCheckpointsTriggeredBySource() throws Exception {
        StreamTaskTestHarness testHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.LONG_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        testHarness.getExecutionConfig().setLatencyTrackingInterval(-1L);
        long numElements = 10L;
        long checkpointEvery = 3L;
        ExternalCheckpointsSource source = new ExternalCheckpointsSource(10L, 3L);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamSource sourceOperator = new StreamSource((SourceFunction)source);
        streamConfig.setStreamOperator((StreamOperator)sourceOperator);
        streamConfig.setOperatorID(new OperatorID());
        testHarness.invoke();
        StreamTask sourceTask = testHarness.getTask();
        ready.await();
        Assert.assertTrue((boolean)((Boolean)sourceTask.triggerCheckpointAsync(new CheckpointMetaData(32L, 829L), CheckpointOptions.forCheckpointWithDefaultLocation(), false).get()));
        sync.trigger();
        this.verifyNextElement(testHarness.getOutput(), 1L);
        sync.trigger();
        this.verifyNextElement(testHarness.getOutput(), 2L);
        sync.trigger();
        this.verifyNextElement(testHarness.getOutput(), 3L);
        this.verifyCheckpointBarrier(testHarness.getOutput(), 1L);
        sync.trigger();
        this.verifyNextElement(testHarness.getOutput(), 4L);
        Assert.assertTrue((boolean)((Boolean)sourceTask.triggerCheckpointAsync(new CheckpointMetaData(34L, 900L), CheckpointOptions.forCheckpointWithDefaultLocation(), false).get()));
        sync.trigger();
        this.verifyNextElement(testHarness.getOutput(), 5L);
        sync.trigger();
        this.verifyNextElement(testHarness.getOutput(), 6L);
        this.verifyCheckpointBarrier(testHarness.getOutput(), 2L);
        long checkpoint = 3L;
        for (long l = 7L; l <= 10L; ++l) {
            sync.trigger();
            this.verifyNextElement(testHarness.getOutput(), l);
            if (l % 3L != 0L) continue;
            this.verifyCheckpointBarrier(testHarness.getOutput(), checkpoint++);
        }
    }

    private void verifyNextElement(BlockingQueue<Object> output, long expectedElement) throws InterruptedException {
        Object next = output.take();
        Assert.assertTrue((String)"next element is not an event", (boolean)(next instanceof StreamRecord));
        Assert.assertEquals((String)"wrong event", (long)expectedElement, (long)((Long)((StreamRecord)next).getValue()));
    }

    private void verifyCheckpointBarrier(BlockingQueue<Object> output, long checkpointId) throws InterruptedException {
        Object next = output.take();
        Assert.assertTrue((String)"next element is not a checkpoint barrier", (boolean)(next instanceof CheckpointBarrier));
        Assert.assertEquals((String)"wrong checkpoint id", (long)checkpointId, (long)((CheckpointBarrier)next).getId());
    }

    private static class ExternalCheckpointsSource
    implements ParallelSourceFunction<Long>,
    ExternallyInducedSource<Long, Object> {
        private final long numEvents;
        private final long checkpointFrequency;
        private ExternallyInducedSource.CheckpointTrigger trigger;

        ExternalCheckpointsSource(long numEvents, long checkpointFrequency) {
            this.numEvents = numEvents;
            this.checkpointFrequency = checkpointFrequency;
        }

        public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
            ready.trigger();
            long checkpoint = 1L;
            for (long num = 1L; num <= this.numEvents; ++num) {
                sync.await();
                ctx.collect((Object)num);
                if (num % this.checkpointFrequency != 0L) continue;
                this.trigger.triggerCheckpoint(checkpoint++);
            }
        }

        public void cancel() {
        }

        public void setCheckpointTrigger(ExternallyInducedSource.CheckpointTrigger checkpointTrigger) {
            this.trigger = checkpointTrigger;
        }

        public MasterTriggerRestoreHook<Object> createMasterTriggerRestoreHook() {
            throw new UnsupportedOperationException("not implemented");
        }
    }
}

