/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.VoidPermanentBlobService;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SnapshotExecutionType;
import org.apache.flink.runtime.state.SnapshotResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.testutils.BackendForTestStream;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.KvStateService;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamFilter;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class TaskCheckpointingBehaviourTest
extends TestLogger {
    private static final OneShotLatch IN_CHECKPOINT_LATCH = new OneShotLatch();

    @Test
    public void testDeclineOnCheckpointErrorInSyncPart() throws Exception {
        TestDeclinedCheckpointResponder checkpointResponder = new TestDeclinedCheckpointResponder();
        Task task = TaskCheckpointingBehaviourTest.createTask(new FilterOperator(), (StateBackend)new SyncFailureInducingStateBackend(), checkpointResponder);
        this.runTaskExpectFailure(task);
    }

    @Test
    public void testDeclineOnCheckpointErrorInAsyncPart() throws Exception {
        TestDeclinedCheckpointResponder checkpointResponder = new TestDeclinedCheckpointResponder();
        Task task = TaskCheckpointingBehaviourTest.createTask(new FilterOperator(), (StateBackend)new AsyncFailureInducingStateBackend(), checkpointResponder);
        this.runTaskExpectCheckpointDeclined(task, checkpointResponder);
    }

    @Test
    public void testBlockingNonInterruptibleCheckpoint() throws Exception {
        BackendForTestStream lockingStateBackend = new BackendForTestStream((BackendForTestStream.StreamFactory & Serializable)() -> new LockingOutputStream());
        Task task = TaskCheckpointingBehaviourTest.createTask(new TestOperator(), (StateBackend)lockingStateBackend, (CheckpointResponder)Mockito.mock(CheckpointResponder.class));
        task.startTaskThread();
        IN_CHECKPOINT_LATCH.await();
        task.cancelExecution();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        Assert.assertNull((Object)task.getFailureCause());
    }

    private void runTaskExpectCheckpointDeclined(Task task, TestDeclinedCheckpointResponder checkpointResponder) throws Exception {
        task.startTaskThread();
        checkpointResponder.declinedLatch.await();
        Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)task.getExecutionState());
        task.cancelExecution();
        task.getExecutingThread().join();
    }

    private void runTaskExpectFailure(Task task) throws Exception {
        task.startTaskThread();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
    }

    private static Task createTask(StreamOperator<?> op, StateBackend backend, CheckpointResponder checkpointResponder) throws IOException {
        Configuration taskConfig = new Configuration();
        StreamConfig cfg = new StreamConfig(taskConfig);
        cfg.setStreamOperator(op);
        cfg.setOperatorID(new OperatorID());
        cfg.setStateBackend(backend);
        ExecutionConfig executionConfig = new ExecutionConfig();
        JobInformation jobInformation = new JobInformation(new JobID(), "test job name", new SerializedValue((Object)executionConfig), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(new JobVertexID(), "test task name", 1, 11, TestStreamTask.class.getName(), taskConfig);
        NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
        return new Task(jobInformation, taskInformation, new ExecutionAttemptID(), new AllocationID(), 0, 0, Collections.emptyList(), Collections.emptyList(), (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (ShuffleEnvironment)shuffleEnvironment, new KvStateService(new KvStateRegistry(), null, null), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), new TaskEventDispatcher(), ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, (TaskStateManager)new TestTaskStateManager(), (TaskManagerActions)Mockito.mock(TaskManagerActions.class), (InputSplitProvider)Mockito.mock(InputSplitProvider.class), checkpointResponder, (TaskOperatorEventGateway)new NoOpTaskOperatorEventGateway(), (GlobalAggregateManager)new TestGlobalAggregateManager(), (LibraryCacheManager.ClassLoaderHandle)TestingClassLoaderLease.newBuilder().build(), new FileCache(new String[]{EnvironmentInformation.getTemporaryFileDirectory()}, (PermanentBlobService)VoidPermanentBlobService.INSTANCE), (TaskManagerRuntimeInfo)new TestingTaskManagerRuntimeInfo(), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), (ResultPartitionConsumableNotifier)new NoOpResultPartitionConsumableNotifier(), (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class), Executors.directExecutor());
    }

    public static final class TestStreamTask
    extends OneInputStreamTask<Object, Object> {
        public TestStreamTask(Environment env) throws Exception {
            super(env);
        }

        public void init() {
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            this.triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder());
            while (this.isRunning()) {
                Thread.sleep(1L);
            }
            controller.suspendDefaultAction();
            this.mailboxProcessor.suspend();
        }

        protected void cleanUpInternal() {
        }
    }

    private static final class TestOperator
    extends StreamFilter<Object> {
        private static final long serialVersionUID = 1L;

        public TestOperator() {
            super((FilterFunction)new FilterFunction<Object>(){

                public boolean filter(Object value) {
                    return false;
                }
            });
        }

        public void snapshotState(StateSnapshotContext context) throws Exception {
            OperatorStateCheckpointOutputStream outStream = context.getRawOperatorStateOutput();
            IN_CHECKPOINT_LATCH.trigger();
            outStream.write(1);
        }
    }

    private static final class FilterOperator
    extends StreamFilter<Object> {
        private static final long serialVersionUID = 1L;

        public FilterOperator() {
            super((FilterFunction)new FilterFunction<Object>(){

                public boolean filter(Object value) {
                    return false;
                }
            });
        }
    }

    private static final class LockingOutputStream
    extends CheckpointStateOutputStream {
        private final Object lock = new Object();
        private volatile boolean closed;

        private LockingOutputStream() {
        }

        @Nullable
        public StreamStateHandle closeAndGetHandle() throws IOException {
            throw new UnsupportedOperationException();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void write(int b) throws IOException {
            Object object = this.lock;
            synchronized (object) {
                while (!this.closed) {
                    try {
                        this.lock.wait();
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close() throws IOException {
            Object object = this.lock;
            synchronized (object) {
                this.closed = true;
                this.lock.notifyAll();
            }
        }

        public long getPos() {
            return 0L;
        }

        public void flush() {
        }

        public void sync() {
        }
    }

    private static class AsyncFailureInducingStateBackend
    extends MemoryStateBackend {
        private static final long serialVersionUID = -7613628662587098470L;
        public static final SnapshotStrategy<OperatorStateHandle, SnapshotResources> FAILING_STRATEGY = new SnapshotStrategy<OperatorStateHandle, SnapshotResources>(){

            public SnapshotResources syncPrepareResources(long checkpointId) throws Exception {
                return null;
            }

            public SnapshotStrategy.SnapshotResultSupplier<OperatorStateHandle> asyncSnapshot(SnapshotResources syncPartResource, long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) {
                return snapshotCloseableRegistry -> {
                    throw new Exception("Async part snapshot exception.");
                };
            }
        };

        private AsyncFailureInducingStateBackend() {
        }

        public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier, @Nonnull Collection<OperatorStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
            return new DefaultOperatorStateBackendBuilder(env.getUserCodeClassLoader().asClassLoader(), env.getExecutionConfig(), true, stateHandles, cancelStreamRegistry){

                public DefaultOperatorStateBackend build() {
                    CloseableRegistry registryForStateBackend = new CloseableRegistry();
                    return new DefaultOperatorStateBackend(this.executionConfig, registryForStateBackend, new HashMap(), new HashMap(), new HashMap(), new HashMap(), new SnapshotStrategyRunner("Failing strategy", FAILING_STRATEGY, registryForStateBackend, SnapshotExecutionType.ASYNCHRONOUS));
                }
            }.build();
        }

        public AsyncFailureInducingStateBackend configure(ReadableConfig config, ClassLoader classLoader) {
            return this;
        }
    }

    private static class SyncFailureInducingStateBackend
    extends MemoryStateBackend {
        private static final long serialVersionUID = -1915780414440060539L;

        private SyncFailureInducingStateBackend() {
        }

        public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier, @Nonnull Collection<OperatorStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
            return new DefaultOperatorStateBackendBuilder(env.getUserCodeClassLoader().asClassLoader(), env.getExecutionConfig(), true, stateHandles, cancelStreamRegistry){

                public DefaultOperatorStateBackend build() {
                    return new DefaultOperatorStateBackend(this.executionConfig, this.cancelStreamRegistry, new HashMap(), new HashMap(), new HashMap(), new HashMap(), (SnapshotStrategyRunner)Mockito.mock(SnapshotStrategyRunner.class)){

                        @Nonnull
                        public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
                            throw new Exception("Sync part snapshot exception.");
                        }
                    };
                }
            }.build();
        }

        public SyncFailureInducingStateBackend configure(ReadableConfig configuration, ClassLoader classLoader) {
            return this;
        }
    }

    private static class TestDeclinedCheckpointResponder
    implements CheckpointResponder {
        final OneShotLatch declinedLatch = new OneShotLatch();

        private TestDeclinedCheckpointResponder() {
        }

        public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
            throw new RuntimeException("Unexpected call.");
        }

        public void reportCheckpointMetrics(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics) {
        }

        public void declineCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointException checkpointException) {
            this.declinedLatch.trigger();
        }

        public OneShotLatch getDeclinedLatch() {
            return this.declinedLatch;
        }
    }
}

