package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.Collections;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerConnection;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamFilter;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.class */
public class BlockingCheckpointsTest {
    private static final OneShotLatch IN_CHECKPOINT_LATCH = new OneShotLatch();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest$LockingOutputStream.class */
    private static final class LockingOutputStream extends CheckpointStreamFactory.CheckpointStateOutputStream {
        private final Object lock;
        private volatile boolean closed;

        private LockingOutputStream() {
            this.lock = new Object();
        }

        public StreamStateHandle closeAndGetHandle() throws IOException {
            return null;
        }

        public void write(int i) throws IOException {
            synchronized (this.lock) {
                while (!this.closed) {
                    try {
                        this.lock.wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
        }

        public void close() throws IOException {
            synchronized (this.lock) {
                this.closed = true;
                this.lock.notifyAll();
            }
        }

        public long getPos() {
            return 0L;
        }

        public void flush() {
        }

        public void sync() {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest$LockingOutputStreamFactory.class */
    private static final class LockingOutputStreamFactory implements CheckpointStreamFactory {
        private LockingOutputStreamFactory() {
        }

        public CheckpointStreamFactory.CheckpointStateOutputStream createCheckpointStateOutputStream(long j, long j2) {
            return new LockingOutputStream();
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest$LockingStreamStateBackend.class */
    private static class LockingStreamStateBackend extends AbstractStateBackend {
        private static final long serialVersionUID = 1;

        private LockingStreamStateBackend() {
        }

        public CheckpointStreamFactory createStreamFactory(JobID jobID, String str) throws IOException {
            return new LockingOutputStreamFactory();
        }

        public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment environment, JobID jobID, String str, TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, TaskKvStateRegistry taskKvStateRegistry) throws Exception {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest$TestOperator.class */
    private static final class TestOperator extends StreamFilter<Object> {
        private static final long serialVersionUID = 1;

        public TestOperator() {
            super(new FilterFunction<Object>() { // from class: org.apache.flink.streaming.runtime.tasks.BlockingCheckpointsTest.TestOperator.1
                public boolean filter(Object obj) {
                    return false;
                }
            });
        }

        public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
            OperatorStateCheckpointOutputStream rawOperatorStateOutput = stateSnapshotContext.getRawOperatorStateOutput();
            BlockingCheckpointsTest.IN_CHECKPOINT_LATCH.trigger();
            rawOperatorStateOutput.write(1);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest$TestStreamTask.class */
    public static final class TestStreamTask extends OneInputStreamTask<Object, Object> {
        public void init() {
        }

        protected void run() throws Exception {
            triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()));
        }

        protected void cleanup() {
        }

        protected void cancelTask() {
        }
    }

    @Test
    public void testBlockingNonInterruptibleCheckpoint() throws Exception {
        Configuration configuration = new Configuration();
        StreamConfig streamConfig = new StreamConfig(configuration);
        streamConfig.setStreamOperator(new TestOperator());
        streamConfig.setStateBackend(new LockingStreamStateBackend());
        Task createTask = createTask(configuration);
        createTask.startTaskThread();
        IN_CHECKPOINT_LATCH.await();
        createTask.cancelExecution();
        createTask.getExecutingThread().join();
        Assert.assertEquals(ExecutionState.CANCELED, createTask.getExecutionState());
        Assert.assertNull(createTask.getFailureCause());
    }

    private static Task createTask(Configuration configuration) throws IOException {
        JobInformation jobInformation = new JobInformation(new JobID(), "test job name", new SerializedValue(new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(new JobVertexID(), "test task name", 1, 11, TestStreamTask.class.getName(), configuration);
        TaskKvStateRegistry taskKvStateRegistry = (TaskKvStateRegistry) Mockito.mock(TaskKvStateRegistry.class);
        NetworkEnvironment networkEnvironment = (NetworkEnvironment) Mockito.mock(NetworkEnvironment.class);
        Mockito.when(networkEnvironment.createKvStateTaskRegistry((JobID) Matchers.any(JobID.class), (JobVertexID) Matchers.any(JobVertexID.class))).thenReturn(taskKvStateRegistry);
        return new Task(jobInformation, taskInformation, new ExecutionAttemptID(), 0, 0, Collections.emptyList(), Collections.emptyList(), 0, (TaskStateHandles) null, (MemoryManager) Mockito.mock(MemoryManager.class), (IOManager) Mockito.mock(IOManager.class), networkEnvironment, (BroadcastVariableManager) Mockito.mock(BroadcastVariableManager.class), (TaskManagerConnection) Mockito.mock(TaskManagerConnection.class), (InputSplitProvider) Mockito.mock(InputSplitProvider.class), (CheckpointResponder) Mockito.mock(CheckpointResponder.class), new FallbackLibraryCacheManager(), new FileCache(new Configuration()), new TaskManagerRuntimeInfo("localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()), new UnregisteredTaskMetricsGroup(), (ResultPartitionConsumableNotifier) Mockito.mock(ResultPartitionConsumableNotifier.class), (PartitionProducerStateChecker) Mockito.mock(PartitionProducerStateChecker.class), Executors.directExecutor());
    }
}
