/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamFilter;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import scala.concurrent.duration.FiniteDuration;

public class BlockingCheckpointsTest {
    private static final OneShotLatch IN_CHECKPOINT_LATCH = new OneShotLatch();

    @Test
    public void testBlockingNonInterruptibleCheckpoint() throws Exception {
        Configuration taskConfig = new Configuration();
        StreamConfig cfg = new StreamConfig(taskConfig);
        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        cfg.setStreamOperator((StreamOperator)new TestOperator());
        cfg.setStateBackend((AbstractStateBackend)new LockingStreamStateBackend());
        Task task = BlockingCheckpointsTest.createTask(taskConfig);
        task.startTaskThread();
        IN_CHECKPOINT_LATCH.await();
        task.cancelExecution();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        Assert.assertNull((Object)task.getFailureCause());
    }

    private static Task createTask(Configuration taskConfig) throws IOException {
        JobInformation jobInformation = new JobInformation(new JobID(), "test job name", new SerializedValue((Object)new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(new JobVertexID(), "test task name", 1, TestStreamTask.class.getName(), taskConfig);
        return new Task(jobInformation, taskInformation, new ExecutionAttemptID(), 0, 0, Collections.emptyList(), Collections.emptyList(), 0, null, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (ActorGateway)Mockito.mock(ActorGateway.class), (ActorGateway)Mockito.mock(ActorGateway.class), new FiniteDuration(10L, TimeUnit.SECONDS), (LibraryCacheManager)new FallbackLibraryCacheManager(), new FileCache(new Configuration()), new TaskManagerRuntimeInfo("localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()), (TaskMetricGroup)new UnregisteredTaskMetricsGroup());
    }

    public static final class TestStreamTask
    extends OneInputStreamTask<Object, Object> {
        public void init() {
        }

        protected void run() throws Exception {
            this.triggerCheckpointOnBarrier(11L, System.currentTimeMillis());
        }

        protected void cleanup() {
        }

        protected void cancelTask() {
        }
    }

    private static final class TestOperator
    extends StreamFilter<Object> {
        private static final long serialVersionUID = 1L;

        public TestOperator() {
            super((FilterFunction)new FilterFunction<Object>(){

                public boolean filter(Object value) {
                    return false;
                }
            });
        }

        public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
            AbstractStateBackend stateBackend = this.getStateBackend();
            AbstractStateBackend.CheckpointStateOutputStream outStream = stateBackend.createCheckpointStateOutputStream(checkpointId, timestamp);
            IN_CHECKPOINT_LATCH.trigger();
            outStream.write(1);
            return null;
        }
    }

    private static final class LockingOutputStream
    extends AbstractStateBackend.CheckpointStateOutputStream
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final SerializableObject lock = new SerializableObject();
        private volatile boolean closed;

        private LockingOutputStream() {
        }

        public StreamStateHandle closeAndGetHandle() throws IOException {
            throw new UnsupportedOperationException();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void write(int b) throws IOException {
            SerializableObject serializableObject = this.lock;
            synchronized (serializableObject) {
                while (!this.closed) {
                    try {
                        this.lock.wait();
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close() throws IOException {
            SerializableObject serializableObject = this.lock;
            synchronized (serializableObject) {
                this.closed = true;
                this.lock.notifyAll();
            }
        }
    }

    private static class LockingStreamStateBackend
    extends AbstractStateBackend {
        private static final long serialVersionUID = 1L;
        private final LockingOutputStream out = new LockingOutputStream();

        private LockingStreamStateBackend() {
        }

        public void disposeAllStateForCurrentJob() {
        }

        public void close() throws IOException {
            this.out.close();
        }

        public AbstractStateBackend.CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
            return this.out;
        }

        protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception {
            throw new UnsupportedOperationException();
        }

        protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
            throw new UnsupportedOperationException();
        }

        protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
            throw new UnsupportedOperationException();
        }

        protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
            throw new UnsupportedOperationException();
        }

        public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S state, long checkpointID, long timestamp) throws Exception {
            throw new UnsupportedOperationException();
        }
    }
}

