/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.Preconditions;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SynchronousCheckpointTest {
    private StreamTaskUnderTest streamTaskUnderTest;
    private CompletableFuture<Void> taskInvocation;
    private LinkedBlockingQueue<Event> eventQueue = new LinkedBlockingQueue();

    @Before
    public void setupTestEnvironment() throws InterruptedException {
        this.taskInvocation = CompletableFuture.runAsync(() -> {
            try {
                this.streamTaskUnderTest = SynchronousCheckpointTest.createTask(this.eventQueue);
                this.streamTaskUnderTest.invoke();
            }
            catch (RuntimeException e) {
                throw e;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, Executors.newSingleThreadExecutor());
        Assert.assertThat((Object)((Object)this.eventQueue.take()), (Matcher)Matchers.is((Object)((Object)Event.TASK_INITIALIZED)));
    }

    @Test(timeout=20000L)
    public void synchronousCheckpointBlocksUntilNotificationForCorrectCheckpointComes() throws Exception {
        this.launchSynchronousSavepointAndWaitForSyncSavepointIdToBeSet();
        Assert.assertTrue((boolean)this.streamTaskUnderTest.getSynchronousSavepointId().isPresent());
        this.streamTaskUnderTest.notifyCheckpointCompleteAsync(41L).get();
        Assert.assertTrue((boolean)this.streamTaskUnderTest.getSynchronousSavepointId().isPresent());
        this.streamTaskUnderTest.notifyCheckpointCompleteAsync(42L).get();
        Assert.assertFalse((boolean)this.streamTaskUnderTest.getSynchronousSavepointId().isPresent());
        this.streamTaskUnderTest.stopTask();
        this.waitUntilMainExecutionThreadIsFinished();
        Assert.assertFalse((boolean)this.streamTaskUnderTest.isCanceled());
    }

    @Test(timeout=10000L)
    public void cancelShouldAlsoCancelPendingSynchronousCheckpoint() throws Throwable {
        this.launchSynchronousSavepointAndWaitForSyncSavepointIdToBeSet();
        Assert.assertTrue((boolean)this.streamTaskUnderTest.getSynchronousSavepointId().isPresent());
        this.streamTaskUnderTest.cancel();
        this.waitUntilMainExecutionThreadIsFinished();
        Assert.assertTrue((boolean)this.streamTaskUnderTest.isCanceled());
    }

    private void launchSynchronousSavepointAndWaitForSyncSavepointIdToBeSet() throws InterruptedException {
        this.streamTaskUnderTest.triggerCheckpointAsync(new CheckpointMetaData(42L, System.currentTimeMillis()), new CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, CheckpointStorageLocationReference.getDefault()), false);
        this.waitForSyncSavepointIdToBeSet(this.streamTaskUnderTest);
    }

    private void waitUntilMainExecutionThreadIsFinished() {
        try {
            this.taskInvocation.get();
        }
        catch (Exception e) {
            Assert.assertThat((Object)e.getCause(), (Matcher)Matchers.is((Matcher)Matchers.instanceOf(CancelTaskException.class)));
        }
    }

    private void waitForSyncSavepointIdToBeSet(StreamTask streamTaskUnderTest) throws InterruptedException {
        while (!streamTaskUnderTest.getSynchronousSavepointId().isPresent()) {
            Thread.sleep(10L);
            if (!this.taskInvocation.isDone()) continue;
            Assert.fail((String)"Task has been terminated too early");
        }
    }

    private static StreamTaskUnderTest createTask(Queue<Event> eventQueue) throws Exception {
        DummyEnvironment environment = new DummyEnvironment("test", 1, 0);
        return new StreamTaskUnderTest((Environment)environment, eventQueue);
    }

    private static class StreamTaskUnderTest
    extends StreamTaskTest.NoOpStreamTask {
        private Queue<Event> eventQueue;
        private volatile boolean stopped;

        StreamTaskUnderTest(Environment env, Queue<Event> eventQueue) throws Exception {
            super(env);
            this.eventQueue = (Queue)Preconditions.checkNotNull(eventQueue);
        }

        @Override
        protected void init() {
            this.eventQueue.add(Event.TASK_INITIALIZED);
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            if (this.stopped || this.isCanceled()) {
                controller.allActionsCompleted();
            }
        }

        void stopTask() {
            this.stopped = true;
        }
    }

    private static enum Event {
        TASK_INITIALIZED;

    }
}

