/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable;
import org.apache.flink.streaming.runtime.tasks.ExceptionallyDoneFuture;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.junit.Assert;
import org.junit.Test;

public class AsyncCheckpointRunnableTest {
    @Test
    public void testReportIncompleteStats() {
        long checkpointId = 1000L;
        TestEnvironment env = new TestEnvironment();
        new AsyncCheckpointRunnable(new HashMap(), new CheckpointMetaData(checkpointId, 1L), new CheckpointMetricsBuilder(), 0L, "Task Name", r -> {}, (Environment)env, (msg, ex) -> {}, false, false, () -> true).close();
        Assert.assertEquals((long)checkpointId, (long)((TestTaskStateManager)env.getTaskStateManager()).getReportedCheckpointId());
    }

    @Test
    public void testDeclineWithAsyncCheckpointExceptionWhenRunning() {
        this.testAsyncCheckpointException(true);
    }

    @Test
    public void testDeclineWithAsyncCheckpointExceptionWhenNotRunning() {
        this.testAsyncCheckpointException(false);
    }

    private void testAsyncCheckpointException(boolean isTaskRunning) {
        HashMap<OperatorID, OperatorSnapshotFutures> snapshotsInProgress = new HashMap<OperatorID, OperatorSnapshotFutures>();
        snapshotsInProgress.put(new OperatorID(), new OperatorSnapshotFutures(ExceptionallyDoneFuture.of(new RuntimeException("Async Checkpoint Exception")), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (Future)DoneFuture.of((Object)SnapshotResult.empty()), (Future)DoneFuture.of((Object)SnapshotResult.empty())));
        TestEnvironment environment = new TestEnvironment();
        AsyncCheckpointRunnable runnable = this.createAsyncRunnable(snapshotsInProgress, environment, false, isTaskRunning);
        runnable.run();
        if (isTaskRunning) {
            Assert.assertSame((Object)environment.getCause().getCheckpointFailureReason(), (Object)CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION);
        } else {
            Assert.assertNull((Object)environment.getCause());
        }
    }

    @Test
    public void testDeclineAsyncCheckpoint() {
        CheckpointFailureReason originalReason = CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM;
        HashMap<OperatorID, OperatorSnapshotFutures> snapshotsInProgress = new HashMap<OperatorID, OperatorSnapshotFutures>();
        snapshotsInProgress.put(new OperatorID(), new OperatorSnapshotFutures((RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), ExceptionallyDoneFuture.of((Throwable)new CheckpointException(originalReason)), (Future)DoneFuture.of((Object)SnapshotResult.empty())));
        TestEnvironment environment = new TestEnvironment();
        AsyncCheckpointRunnable runnable = this.createAsyncRunnable(snapshotsInProgress, environment, false, true);
        runnable.run();
        Assert.assertSame((Object)environment.getCause().getCheckpointFailureReason(), (Object)originalReason);
    }

    @Test
    public void testReportFinishedOnRestoreTaskSnapshots() {
        TestEnvironment environment = new TestEnvironment();
        AsyncCheckpointRunnable asyncCheckpointRunnable = this.createAsyncRunnable(new HashMap<OperatorID, OperatorSnapshotFutures>(), environment, true, true);
        asyncCheckpointRunnable.run();
        TestTaskStateManager testTaskStateManager = (TestTaskStateManager)environment.getTaskStateManager();
        Assert.assertEquals((long)asyncCheckpointRunnable.getCheckpointId(), (long)testTaskStateManager.getReportedCheckpointId());
        Assert.assertEquals((Object)TaskStateSnapshot.FINISHED_ON_RESTORE, (Object)testTaskStateManager.getLastJobManagerTaskStateSnapshot());
        Assert.assertEquals((Object)TaskStateSnapshot.FINISHED_ON_RESTORE, (Object)testTaskStateManager.getLastTaskManagerTaskStateSnapshot());
        Assert.assertTrue((boolean)asyncCheckpointRunnable.getFinishedFuture().isDone());
    }

    private AsyncCheckpointRunnable createAsyncRunnable(Map<OperatorID, OperatorSnapshotFutures> snapshotsInProgress, TestEnvironment environment, boolean isTaskDeployedAsFinished, boolean isTaskRunning) {
        return new AsyncCheckpointRunnable(snapshotsInProgress, new CheckpointMetaData(1L, 1L), new CheckpointMetricsBuilder().setBytesProcessedDuringAlignment(0L).setAlignmentDurationNanos(0L), 1L, "Task Name", r -> {}, (Environment)environment, (msg, ex) -> {}, isTaskDeployedAsFinished, false, () -> isTaskRunning);
    }

    private static class TestEnvironment
    extends StreamMockEnvironment {
        CheckpointException cause = null;

        TestEnvironment() {
            this(new Configuration(), new Configuration(), new ExecutionConfig(), 1L, new MockInputSplitProvider(), 1, (TaskStateManager)new TestTaskStateManager());
        }

        TestEnvironment(Configuration jobConfig, Configuration taskConfig, ExecutionConfig executionConfig, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize, TaskStateManager taskStateManager) {
            super(jobConfig, taskConfig, executionConfig, memorySize, inputSplitProvider, bufferSize, taskStateManager);
        }

        @Override
        public void declineCheckpoint(long checkpointId, CheckpointException checkpointException) {
            this.cause = checkpointException;
        }

        CheckpointException getCause() {
            return this.cause;
        }
    }
}

