package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.class */
public class InterruptSensitiveRestoreTest {
    private static final OneShotLatch IN_RESTORE_LATCH = new OneShotLatch();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest$InterruptLockingStateHandle.class */
    private static class InterruptLockingStateHandle implements StateHandle<Serializable> {
        private volatile transient boolean closed;

        private InterruptLockingStateHandle() {
        }

        /* renamed from: getState, reason: merged with bridge method [inline-methods] */
        public Serializable m29getState(ClassLoader classLoader) {
            InterruptSensitiveRestoreTest.IN_RESTORE_LATCH.trigger();
            try {
                synchronized (this) {
                    wait();
                }
            } catch (InterruptedException e) {
                while (!this.closed) {
                    synchronized (this) {
                        wait();
                    }
                }
            }
            return new SerializableObject();
        }

        public void discardState() throws Exception {
        }

        public long getStateSize() throws Exception {
            return 0L;
        }

        public void close() throws IOException {
            this.closed = true;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest$TestSource.class */
    private static class TestSource implements SourceFunction<Object>, Checkpointed<Serializable> {
        private static final long serialVersionUID = 1;

        private TestSource() {
        }

        public void run(SourceFunction.SourceContext<Object> sourceContext) throws Exception {
            Assert.fail("should never be called");
        }

        public void cancel() {
        }

        public Serializable snapshotState(long j, long j2) throws Exception {
            Assert.fail("should never be called");
            return null;
        }

        public void restoreState(Serializable serializable) throws Exception {
            Assert.fail("should never be called");
        }
    }

    @Test
    public void testRestoreWithInterrupt() throws Exception {
        Configuration configuration = new Configuration();
        StreamConfig streamConfig = new StreamConfig(configuration);
        streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        streamConfig.setStreamOperator(new StreamSource(new TestSource()));
        InterruptLockingStateHandle interruptLockingStateHandle = new InterruptLockingStateHandle();
        StreamTaskState streamTaskState = new StreamTaskState();
        streamTaskState.setFunctionState(interruptLockingStateHandle);
        Task createTask = createTask(createTaskDeploymentDescriptor(configuration, new StreamTaskStateList(new StreamTaskState[]{streamTaskState})));
        createTask.startTaskThread();
        IN_RESTORE_LATCH.await();
        createTask.cancelExecution();
        createTask.getExecutingThread().join(30000L);
        if (createTask.getExecutionState() == ExecutionState.CANCELING) {
            Assert.fail("Task is stuck and not canceling");
        }
        Assert.assertEquals(ExecutionState.CANCELED, createTask.getExecutionState());
        Assert.assertNull(createTask.getFailureCause());
    }

    private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(Configuration configuration, StateHandle<?> stateHandle) throws IOException {
        return new TaskDeploymentDescriptor(new JobID(), "test job name", new JobVertexID(), new ExecutionAttemptID(), new SerializedValue(new ExecutionConfig()), "test task name", 0, 1, 0, new Configuration(), configuration, SourceStreamTask.class.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 0, new SerializedValue(stateHandle));
    }

    private static Task createTask(TaskDeploymentDescriptor taskDeploymentDescriptor) throws IOException {
        return new Task(taskDeploymentDescriptor, (MemoryManager) Mockito.mock(MemoryManager.class), (IOManager) Mockito.mock(IOManager.class), (NetworkEnvironment) Mockito.mock(NetworkEnvironment.class), (BroadcastVariableManager) Mockito.mock(BroadcastVariableManager.class), (ActorGateway) Mockito.mock(ActorGateway.class), (ActorGateway) Mockito.mock(ActorGateway.class), new FiniteDuration(10L, TimeUnit.SECONDS), new FallbackLibraryCacheManager(), new FileCache(new Configuration()), new TaskManagerRuntimeInfo("localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()), new UnregisteredTaskMetricsGroup());
    }
}
