/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.testutils.BackendForTestStream;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamFilter;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

public class TaskCheckpointingBehaviourTest
extends TestLogger {
    private static final OneShotLatch IN_CHECKPOINT_LATCH = new OneShotLatch();

    @Test
    public void testDeclineOnCheckpointErrorInSyncPart() throws Exception {
        this.runTestDeclineOnCheckpointError((AbstractStateBackend)new SyncFailureInducingStateBackend());
    }

    @Test
    public void testDeclineOnCheckpointErrorInAsyncPart() throws Exception {
        this.runTestDeclineOnCheckpointError((AbstractStateBackend)new AsyncFailureInducingStateBackend());
    }

    @Test
    public void testTaskFailingOnCheckpointErrorInSyncPart() throws Exception {
        Throwable failureCause = this.runTestTaskFailingOnCheckpointError((AbstractStateBackend)new SyncFailureInducingStateBackend());
        Assert.assertNotNull((Object)failureCause);
        String expectedMessageStart = "Could not perform checkpoint";
        Assert.assertEquals((Object)expectedMessageStart, (Object)failureCause.getMessage().substring(0, expectedMessageStart.length()));
    }

    @Test
    public void testTaskFailingOnCheckpointErrorInAsyncPart() throws Exception {
        Throwable failureCause = this.runTestTaskFailingOnCheckpointError((AbstractStateBackend)new AsyncFailureInducingStateBackend());
        Assert.assertEquals(AsynchronousException.class, failureCause.getClass());
    }

    @Test
    public void testBlockingNonInterruptibleCheckpoint() throws Exception {
        BackendForTestStream lockingStateBackend = new BackendForTestStream((BackendForTestStream.StreamFactory & Serializable)() -> new LockingOutputStream());
        Task task = TaskCheckpointingBehaviourTest.createTask(new TestOperator(), (StateBackend)lockingStateBackend, (CheckpointResponder)Mockito.mock(CheckpointResponder.class), true);
        task.startTaskThread();
        IN_CHECKPOINT_LATCH.await();
        task.cancelExecution();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        Assert.assertNull((Object)task.getFailureCause());
    }

    private void runTestDeclineOnCheckpointError(AbstractStateBackend backend) throws Exception {
        TestDeclinedCheckpointResponder checkpointResponder = new TestDeclinedCheckpointResponder();
        Task task = TaskCheckpointingBehaviourTest.createTask(new FilterOperator(), (StateBackend)backend, checkpointResponder, false);
        task.startTaskThread();
        checkpointResponder.declinedLatch.await();
        Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)task.getExecutionState());
        task.cancelExecution();
        task.getExecutingThread().join();
    }

    private Throwable runTestTaskFailingOnCheckpointError(AbstractStateBackend backend) throws Exception {
        Task task = TaskCheckpointingBehaviourTest.createTask(new FilterOperator(), (StateBackend)backend, (CheckpointResponder)Mockito.mock(CheckpointResponder.class), true);
        task.startTaskThread();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        return task.getFailureCause();
    }

    private static Task createTask(StreamOperator<?> op, StateBackend backend, CheckpointResponder checkpointResponder, boolean failOnCheckpointErrors) throws IOException {
        Configuration taskConfig = new Configuration();
        StreamConfig cfg = new StreamConfig(taskConfig);
        cfg.setStreamOperator(op);
        cfg.setOperatorID(new OperatorID());
        cfg.setStateBackend(backend);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setFailTaskOnCheckpointError(failOnCheckpointErrors);
        JobInformation jobInformation = new JobInformation(new JobID(), "test job name", new SerializedValue((Object)executionConfig), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(new JobVertexID(), "test task name", 1, 11, TestStreamTask.class.getName(), taskConfig);
        TaskKvStateRegistry mockKvRegistry = (TaskKvStateRegistry)Mockito.mock(TaskKvStateRegistry.class);
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
        NetworkEnvironment network = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
        Mockito.when((Object)network.createKvStateTaskRegistry((JobID)Matchers.any(JobID.class), (JobVertexID)Matchers.any(JobVertexID.class))).thenReturn((Object)mockKvRegistry);
        Mockito.when((Object)network.getTaskEventDispatcher()).thenReturn((Object)taskEventDispatcher);
        BlobCacheService blobService = new BlobCacheService((PermanentBlobCache)Mockito.mock(PermanentBlobCache.class), (TransientBlobCache)Mockito.mock(TransientBlobCache.class));
        return new Task(jobInformation, taskInformation, new ExecutionAttemptID(), new AllocationID(), 0, 0, Collections.emptyList(), Collections.emptyList(), 0, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), network, (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (TaskStateManager)new TestTaskStateManager(), (TaskManagerActions)Mockito.mock(TaskManagerActions.class), (InputSplitProvider)Mockito.mock(InputSplitProvider.class), checkpointResponder, blobService, (LibraryCacheManager)new BlobLibraryCacheManager((PermanentBlobService)blobService.getPermanentBlobService(), FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]), new FileCache(new String[]{EnvironmentInformation.getTemporaryFileDirectory()}, (PermanentBlobService)blobService.getPermanentBlobService()), (TaskManagerRuntimeInfo)new TestingTaskManagerRuntimeInfo(), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), (ResultPartitionConsumableNotifier)new NoOpResultPartitionConsumableNotifier(), (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class), Executors.directExecutor());
    }

    public static final class TestStreamTask
    extends OneInputStreamTask<Object, Object> {
        public TestStreamTask(Environment env) {
            super(env);
        }

        public void init() {
        }

        protected void run() throws Exception {
            this.triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetrics());
            while (this.isRunning()) {
                Thread.sleep(1L);
            }
        }

        protected void cleanup() {
        }

        protected void cancelTask() {
        }
    }

    private static final class TestOperator
    extends StreamFilter<Object> {
        private static final long serialVersionUID = 1L;

        public TestOperator() {
            super((FilterFunction)new FilterFunction<Object>(){

                public boolean filter(Object value) {
                    return false;
                }
            });
        }

        public void snapshotState(StateSnapshotContext context) throws Exception {
            OperatorStateCheckpointOutputStream outStream = context.getRawOperatorStateOutput();
            IN_CHECKPOINT_LATCH.trigger();
            outStream.write(1);
        }
    }

    private static final class FilterOperator
    extends StreamFilter<Object> {
        private static final long serialVersionUID = 1L;

        public FilterOperator() {
            super((FilterFunction)new FilterFunction<Object>(){

                public boolean filter(Object value) {
                    return false;
                }
            });
        }
    }

    private static final class LockingOutputStream
    extends CheckpointStreamFactory.CheckpointStateOutputStream {
        private final Object lock = new Object();
        private volatile boolean closed;

        private LockingOutputStream() {
        }

        @Nullable
        public StreamStateHandle closeAndGetHandle() throws IOException {
            throw new UnsupportedOperationException();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void write(int b) throws IOException {
            Object object = this.lock;
            synchronized (object) {
                while (!this.closed) {
                    try {
                        this.lock.wait();
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close() throws IOException {
            Object object = this.lock;
            synchronized (object) {
                this.closed = true;
                this.lock.notifyAll();
            }
        }

        public long getPos() {
            return 0L;
        }

        public void flush() {
        }

        public void sync() {
        }
    }

    private static class AsyncFailureInducingStateBackend
    extends MemoryStateBackend {
        private static final long serialVersionUID = -7613628662587098470L;

        private AsyncFailureInducingStateBackend() {
        }

        public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
            return new DefaultOperatorStateBackend(env.getUserClassLoader(), env.getExecutionConfig(), true){

                @Nonnull
                public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
                    return new FutureTask<SnapshotResult<OperatorStateHandle>>(() -> {
                        throw new Exception("Async part snapshot exception.");
                    });
                }
            };
        }

        public AsyncFailureInducingStateBackend configure(Configuration config) {
            return this;
        }
    }

    private static class SyncFailureInducingStateBackend
    extends MemoryStateBackend {
        private static final long serialVersionUID = -1915780414440060539L;

        private SyncFailureInducingStateBackend() {
        }

        public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
            return new DefaultOperatorStateBackend(env.getUserClassLoader(), env.getExecutionConfig(), true){

                @Nonnull
                public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
                    throw new Exception("Sync part snapshot exception.");
                }
            };
        }

        public SyncFailureInducingStateBackend configure(Configuration config) {
            return this;
        }
    }

    private static class TestDeclinedCheckpointResponder
    implements CheckpointResponder {
        final OneShotLatch declinedLatch = new OneShotLatch();

        private TestDeclinedCheckpointResponder() {
        }

        public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
            throw new RuntimeException("Unexpected call.");
        }

        public void declineCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, Throwable cause) {
            this.declinedLatch.trigger();
        }

        public OneShotLatch getDeclinedLatch() {
            return this.declinedLatch;
        }
    }
}

