package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamFilter;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.class */
public class BlockingCheckpointsTest {
    private static final OneShotLatch IN_CHECKPOINT_LATCH = new OneShotLatch();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest$LockingOutputStream.class */
    private static final class LockingOutputStream extends AbstractStateBackend.CheckpointStateOutputStream implements Serializable {
        private static final long serialVersionUID = 1;
        private final SerializableObject lock;
        private volatile boolean closed;

        private LockingOutputStream() {
            this.lock = new SerializableObject();
        }

        public StreamStateHandle closeAndGetHandle() throws IOException {
            throw new UnsupportedOperationException();
        }

        public void write(int i) throws IOException {
            synchronized (this.lock) {
                while (!this.closed) {
                    try {
                        this.lock.wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
        }

        public void close() throws IOException {
            synchronized (this.lock) {
                this.closed = true;
                this.lock.notifyAll();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest$LockingStreamStateBackend.class */
    private static class LockingStreamStateBackend extends AbstractStateBackend {
        private static final long serialVersionUID = 1;
        private final LockingOutputStream out;

        private LockingStreamStateBackend() {
            this.out = new LockingOutputStream();
        }

        public void disposeAllStateForCurrentJob() {
        }

        public void close() throws IOException {
            this.out.close();
        }

        public AbstractStateBackend.CheckpointStateOutputStream createCheckpointStateOutputStream(long j, long j2) throws Exception {
            return this.out;
        }

        protected <N, T> ValueState<T> createValueState(TypeSerializer<N> typeSerializer, ValueStateDescriptor<T> valueStateDescriptor) throws Exception {
            throw new UnsupportedOperationException();
        }

        protected <N, T> ListState<T> createListState(TypeSerializer<N> typeSerializer, ListStateDescriptor<T> listStateDescriptor) throws Exception {
            throw new UnsupportedOperationException();
        }

        protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> typeSerializer, ReducingStateDescriptor<T> reducingStateDescriptor) throws Exception {
            throw new UnsupportedOperationException();
        }

        protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> typeSerializer, FoldingStateDescriptor<T, ACC> foldingStateDescriptor) throws Exception {
            throw new UnsupportedOperationException();
        }

        public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S s, long j, long j2) throws Exception {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest$TestOperator.class */
    private static final class TestOperator extends StreamFilter<Object> {
        private static final long serialVersionUID = 1;

        public TestOperator() {
            super(new FilterFunction<Object>() { // from class: org.apache.flink.streaming.runtime.tasks.BlockingCheckpointsTest.TestOperator.1
                public boolean filter(Object obj) {
                    return false;
                }
            });
        }

        public StreamTaskState snapshotOperatorState(long j, long j2) throws Exception {
            AbstractStateBackend.CheckpointStateOutputStream createCheckpointStateOutputStream = getStateBackend().createCheckpointStateOutputStream(j, j2);
            BlockingCheckpointsTest.IN_CHECKPOINT_LATCH.trigger();
            createCheckpointStateOutputStream.write(1);
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest$TestStreamTask.class */
    public static final class TestStreamTask extends OneInputStreamTask<Object, Object> {
        public void init() {
        }

        protected void run() throws Exception {
            triggerCheckpointOnBarrier(11L, System.currentTimeMillis());
        }

        protected void cleanup() {
        }

        protected void cancelTask() {
        }
    }

    @Test
    public void testBlockingNonInterruptibleCheckpoint() throws Exception {
        Configuration configuration = new Configuration();
        StreamConfig streamConfig = new StreamConfig(configuration);
        streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        streamConfig.setStreamOperator(new TestOperator());
        streamConfig.setStateBackend(new LockingStreamStateBackend());
        Task createTask = createTask(configuration);
        createTask.startTaskThread();
        IN_CHECKPOINT_LATCH.await();
        createTask.cancelExecution();
        createTask.getExecutingThread().join();
        Assert.assertEquals(ExecutionState.CANCELED, createTask.getExecutionState());
        Assert.assertNull(createTask.getFailureCause());
    }

    private static Task createTask(Configuration configuration) throws IOException {
        return new Task(new JobInformation(new JobID(), "test job name", new SerializedValue(new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList()), new TaskInformation(new JobVertexID(), "test task name", 1, TestStreamTask.class.getName(), configuration), new ExecutionAttemptID(), 0, 0, Collections.emptyList(), Collections.emptyList(), 0, (SerializedValue) null, (MemoryManager) Mockito.mock(MemoryManager.class), (IOManager) Mockito.mock(IOManager.class), (NetworkEnvironment) Mockito.mock(NetworkEnvironment.class), (BroadcastVariableManager) Mockito.mock(BroadcastVariableManager.class), (ActorGateway) Mockito.mock(ActorGateway.class), (ActorGateway) Mockito.mock(ActorGateway.class), new FiniteDuration(10L, TimeUnit.SECONDS), new FallbackLibraryCacheManager(), new FileCache(new Configuration()), new TaskManagerRuntimeInfo("localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()), new UnregisteredTaskMetricsGroup());
    }
}
