package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.class */
public class SynchronousCheckpointTest {
    private OneShotLatch execLatch;
    private AtomicReference<Throwable> error;
    private StreamTask streamTaskUnderTest;
    private Thread mainThreadExecutingTaskUnderTest;
    private Thread checkpointingThread;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest$StreamTaskUnderTest.class */
    public static class StreamTaskUnderTest extends StreamTaskTest.NoOpStreamTask {
        private final OneShotLatch runningLatch;
        private final OneShotLatch execLatch;

        StreamTaskUnderTest(Environment environment, OneShotLatch oneShotLatch, OneShotLatch oneShotLatch2) {
            super(environment);
            this.runningLatch = (OneShotLatch) Preconditions.checkNotNull(oneShotLatch);
            this.execLatch = (OneShotLatch) Preconditions.checkNotNull(oneShotLatch2);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask
        public void performDefaultAction(StreamTask.ActionContext actionContext) throws Exception {
            this.runningLatch.trigger();
            this.execLatch.await();
            super.performDefaultAction(actionContext);
        }
    }

    @Before
    public void setupTestEnvironment() throws InterruptedException {
        OneShotLatch oneShotLatch = new OneShotLatch();
        this.execLatch = new OneShotLatch();
        this.error = new AtomicReference<>();
        this.mainThreadExecutingTaskUnderTest = launchOnSeparateThread(() -> {
            this.streamTaskUnderTest = createTask(oneShotLatch, this.execLatch);
            try {
                this.streamTaskUnderTest.invoke();
            } catch (Exception e) {
                this.error.set(e);
            }
        });
        oneShotLatch.await();
    }

    @Test(timeout = 1000)
    public void synchronousCheckpointBlocksUntilNotificationForCorrectCheckpointComes() throws Exception {
        SynchronousSavepointLatch launchSynchronousSavepointAndGetTheLatch = launchSynchronousSavepointAndGetTheLatch();
        Assert.assertFalse(launchSynchronousSavepointAndGetTheLatch.isCompleted());
        this.streamTaskUnderTest.notifyCheckpointComplete(41L);
        Assert.assertFalse(launchSynchronousSavepointAndGetTheLatch.isCompleted());
        this.streamTaskUnderTest.notifyCheckpointComplete(42L);
        Assert.assertTrue(launchSynchronousSavepointAndGetTheLatch.isCompleted());
        waitUntilCheckpointingThreadIsFinished();
        allowTaskToExitTheRunLoop();
        waitUntilMainExecutionThreadIsFinished();
        Assert.assertFalse(this.streamTaskUnderTest.isCanceled());
    }

    @Test(timeout = 1000)
    public void cancelShouldAlsoCancelPendingSynchronousCheckpoint() throws Throwable {
        SynchronousSavepointLatch launchSynchronousSavepointAndGetTheLatch = launchSynchronousSavepointAndGetTheLatch();
        Assert.assertFalse(launchSynchronousSavepointAndGetTheLatch.isCompleted());
        allowTaskToExitTheRunLoop();
        Assert.assertFalse(launchSynchronousSavepointAndGetTheLatch.isCompleted());
        this.streamTaskUnderTest.cancel();
        Assert.assertTrue(launchSynchronousSavepointAndGetTheLatch.isCanceled());
        waitUntilCheckpointingThreadIsFinished();
        waitUntilMainExecutionThreadIsFinished();
        Assert.assertTrue(this.streamTaskUnderTest.isCanceled());
    }

    private SynchronousSavepointLatch launchSynchronousSavepointAndGetTheLatch() throws InterruptedException {
        this.checkpointingThread = launchOnSeparateThread(() -> {
            try {
                this.streamTaskUnderTest.triggerCheckpoint(new CheckpointMetaData(42L, System.currentTimeMillis()), new CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, CheckpointStorageLocationReference.getDefault()), false);
            } catch (Exception e) {
                this.error.set(e);
            }
        });
        return waitForSyncSavepointLatchToBeSet(this.streamTaskUnderTest);
    }

    private void waitUntilMainExecutionThreadIsFinished() throws InterruptedException {
        this.mainThreadExecutingTaskUnderTest.join();
    }

    private void waitUntilCheckpointingThreadIsFinished() throws InterruptedException {
        this.checkpointingThread.join();
    }

    private void allowTaskToExitTheRunLoop() {
        this.execLatch.trigger();
    }

    private SynchronousSavepointLatch waitForSyncSavepointLatchToBeSet(StreamTask streamTask) throws InterruptedException {
        SynchronousSavepointLatch synchronousSavepointLatch = streamTask.getSynchronousSavepointLatch();
        while (!synchronousSavepointLatch.isSet()) {
            Thread.sleep(10L);
            if (this.error.get() != null && !(this.error.get() instanceof CancelTaskException)) {
                Assert.fail();
            }
        }
        return synchronousSavepointLatch;
    }

    private Thread launchOnSeparateThread(Runnable runnable) {
        Thread thread = new Thread(runnable);
        thread.start();
        return thread;
    }

    private StreamTask createTask(OneShotLatch oneShotLatch, OneShotLatch oneShotLatch2) {
        return new StreamTaskUnderTest(new DummyEnvironment("test", 1, 0), oneShotLatch, oneShotLatch2);
    }
}
