/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
import org.apache.flink.streaming.runtime.tasks.SynchronousSavepointLatch;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SynchronousCheckpointTest {
    private OneShotLatch execLatch;
    private AtomicReference<Throwable> error;
    private StreamTask streamTaskUnderTest;
    private Thread mainThreadExecutingTaskUnderTest;
    private Thread checkpointingThread;

    @Before
    public void setupTestEnvironment() throws InterruptedException {
        OneShotLatch runningLatch = new OneShotLatch();
        this.execLatch = new OneShotLatch();
        this.error = new AtomicReference();
        this.mainThreadExecutingTaskUnderTest = this.launchOnSeparateThread(() -> {
            this.streamTaskUnderTest = this.createTask(runningLatch, this.execLatch);
            try {
                this.streamTaskUnderTest.invoke();
            }
            catch (Exception e) {
                this.error.set(e);
            }
        });
        runningLatch.await();
    }

    @Test(timeout=1000L)
    public void synchronousCheckpointBlocksUntilNotificationForCorrectCheckpointComes() throws Exception {
        SynchronousSavepointLatch syncSavepointLatch = this.launchSynchronousSavepointAndGetTheLatch();
        Assert.assertFalse((boolean)syncSavepointLatch.isCompleted());
        this.streamTaskUnderTest.notifyCheckpointComplete(41L);
        Assert.assertFalse((boolean)syncSavepointLatch.isCompleted());
        this.streamTaskUnderTest.notifyCheckpointComplete(42L);
        Assert.assertTrue((boolean)syncSavepointLatch.isCompleted());
        this.waitUntilCheckpointingThreadIsFinished();
        this.allowTaskToExitTheRunLoop();
        this.waitUntilMainExecutionThreadIsFinished();
        Assert.assertFalse((boolean)this.streamTaskUnderTest.isCanceled());
    }

    @Test(timeout=1000L)
    public void cancelShouldAlsoCancelPendingSynchronousCheckpoint() throws Throwable {
        SynchronousSavepointLatch syncSavepointLatch = this.launchSynchronousSavepointAndGetTheLatch();
        Assert.assertFalse((boolean)syncSavepointLatch.isCompleted());
        this.allowTaskToExitTheRunLoop();
        Assert.assertFalse((boolean)syncSavepointLatch.isCompleted());
        this.streamTaskUnderTest.cancel();
        Assert.assertTrue((boolean)syncSavepointLatch.isCanceled());
        this.waitUntilCheckpointingThreadIsFinished();
        this.waitUntilMainExecutionThreadIsFinished();
        Assert.assertTrue((boolean)this.streamTaskUnderTest.isCanceled());
    }

    private SynchronousSavepointLatch launchSynchronousSavepointAndGetTheLatch() throws InterruptedException {
        this.checkpointingThread = this.launchOnSeparateThread(() -> {
            try {
                this.streamTaskUnderTest.triggerCheckpoint(new CheckpointMetaData(42L, System.currentTimeMillis()), new CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, CheckpointStorageLocationReference.getDefault()), false);
            }
            catch (Exception e) {
                this.error.set(e);
            }
        });
        return this.waitForSyncSavepointLatchToBeSet(this.streamTaskUnderTest);
    }

    private void waitUntilMainExecutionThreadIsFinished() throws InterruptedException {
        this.mainThreadExecutingTaskUnderTest.join();
    }

    private void waitUntilCheckpointingThreadIsFinished() throws InterruptedException {
        this.checkpointingThread.join();
    }

    private void allowTaskToExitTheRunLoop() {
        this.execLatch.trigger();
    }

    private SynchronousSavepointLatch waitForSyncSavepointLatchToBeSet(StreamTask streamTaskUnderTest) throws InterruptedException {
        SynchronousSavepointLatch syncSavepointFuture = streamTaskUnderTest.getSynchronousSavepointLatch();
        while (!syncSavepointFuture.isSet()) {
            Thread.sleep(10L);
            if (this.error.get() == null || this.error.get() instanceof CancelTaskException) continue;
            Assert.fail();
        }
        return syncSavepointFuture;
    }

    private Thread launchOnSeparateThread(Runnable runnable) {
        Thread thread = new Thread(runnable);
        thread.start();
        return thread;
    }

    private StreamTask createTask(OneShotLatch runningLatch, OneShotLatch execLatch) {
        DummyEnvironment environment = new DummyEnvironment("test", 1, 0);
        return new StreamTaskUnderTest((Environment)environment, runningLatch, execLatch);
    }

    private static class StreamTaskUnderTest
    extends StreamTaskTest.NoOpStreamTask {
        private final OneShotLatch runningLatch;
        private final OneShotLatch execLatch;

        StreamTaskUnderTest(Environment env, OneShotLatch runningLatch, OneShotLatch execLatch) {
            super(env);
            this.runningLatch = (OneShotLatch)Preconditions.checkNotNull((Object)runningLatch);
            this.execLatch = (OneShotLatch)Preconditions.checkNotNull((Object)execLatch);
        }

        @Override
        protected void processInput(StreamTask.ActionContext context) throws Exception {
            this.runningLatch.trigger();
            this.execLatch.await();
            super.processInput(context);
        }
    }
}

