package org.apache.flink.streaming.runtime.tasks;

import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.StreamCorruptedException;
import java.lang.Thread;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.TestingUncaughtExceptionHandler;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.writer.AvailabilityTestResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.metrics.TimerGauge;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.streaming.util.StreamTaskUtil;
import org.apache.flink.util.CloseableIterable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest.class */
public class StreamTaskTest extends TestLogger {
    private static OneShotLatch syncLatch;

    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds(30);

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$AcknowledgeDummyEnvironment.class */
    private static class AcknowledgeDummyEnvironment extends DummyEnvironment {
        private final CompletableFuture<Long> acknowledgeCheckpointFuture;

        private AcknowledgeDummyEnvironment() {
            this.acknowledgeCheckpointFuture = new CompletableFuture<>();
        }

        public CompletableFuture<Long> getAcknowledgeCheckpointFuture() {
            return this.acknowledgeCheckpointFuture;
        }

        public void acknowledgeCheckpoint(long j, CheckpointMetrics checkpointMetrics) {
            this.acknowledgeCheckpointFuture.complete(Long.valueOf(j));
        }

        public void acknowledgeCheckpoint(long j, CheckpointMetrics checkpointMetrics, TaskStateSnapshot taskStateSnapshot) {
            this.acknowledgeCheckpointFuture.complete(Long.valueOf(j));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$AvailabilityTestInputProcessor.class */
    public static class AvailabilityTestInputProcessor implements StreamInputProcessor {
        private final int totalProcessCalls;
        private int currentNumProcessCalls;

        AvailabilityTestInputProcessor(int i) {
            this.totalProcessCalls = i;
        }

        public InputStatus processInput() {
            int i = this.currentNumProcessCalls + 1;
            this.currentNumProcessCalls = i;
            return i < this.totalProcessCalls ? InputStatus.MORE_AVAILABLE : InputStatus.END_OF_INPUT;
        }

        public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long j) throws CheckpointException {
            return FutureUtils.completedVoidFuture();
        }

        public void close() throws IOException {
        }

        public CompletableFuture<?> getAvailableFuture() {
            return AVAILABLE;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$BlockingCloseStreamOperator.class */
    private static class BlockingCloseStreamOperator extends AbstractStreamOperator<Void> {
        private static final long serialVersionUID = -9042150529568008847L;
        private static volatile OneShotLatch inClose;
        private static volatile OneShotLatch finishClose;

        private BlockingCloseStreamOperator() {
        }

        public void close() throws Exception {
            checkLatches();
            inClose.trigger();
            finishClose.await();
            super.close();
        }

        private void checkLatches() {
            Preconditions.checkNotNull(inClose);
            Preconditions.checkNotNull(finishClose);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static void resetLatches() {
            inClose = new OneShotLatch();
            finishClose = new OneShotLatch();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$BlockingRunnableFuture.class */
    private static final class BlockingRunnableFuture<V> implements RunnableFuture<V> {
        private final CompletableFuture<V> future;
        private final OneShotLatch signalRunLatch;
        private final CountDownLatch continueRunLatch;
        private final V value;

        private BlockingRunnableFuture(int i, V v) {
            this.future = new CompletableFuture<>();
            this.signalRunLatch = new OneShotLatch();
            this.continueRunLatch = new CountDownLatch(i);
            this.value = v;
        }

        @Override // java.util.concurrent.RunnableFuture, java.lang.Runnable
        public void run() {
            this.signalRunLatch.trigger();
            this.continueRunLatch.countDown();
            try {
                this.continueRunLatch.await();
            } catch (InterruptedException e) {
                ExceptionUtils.rethrow(e);
            }
            this.future.complete(this.value);
        }

        @Override // java.util.concurrent.Future
        public boolean cancel(boolean z) {
            return false;
        }

        @Override // java.util.concurrent.Future
        public boolean isCancelled() {
            return this.future.isCancelled();
        }

        @Override // java.util.concurrent.Future
        public boolean isDone() {
            return this.future.isDone();
        }

        @Override // java.util.concurrent.Future
        public V get() throws InterruptedException, ExecutionException {
            return this.future.get();
        }

        @Override // java.util.concurrent.Future
        public V get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
            return this.future.get(j, timeUnit);
        }

        void awaitRun() throws InterruptedException {
            this.signalRunLatch.await();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$CancelFailingTask.class */
    public static class CancelFailingTask extends StreamTask<String, AbstractStreamOperator<String>> {

        /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$CancelFailingTask$LockHolder.class */
        private static final class LockHolder extends Thread implements Closeable {
            private final OneShotLatch trigger;
            private final Object lock;
            private volatile boolean canceled;

            private LockHolder(Object obj, OneShotLatch oneShotLatch) {
                this.lock = obj;
                this.trigger = oneShotLatch;
            }

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                synchronized (this.lock) {
                    while (!this.canceled) {
                        this.trigger.trigger();
                        try {
                            Thread.sleep(1000000000L);
                        } catch (InterruptedException e) {
                        }
                    }
                }
            }

            public void cancel() {
                this.canceled = true;
            }

            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() {
                this.canceled = true;
                interrupt();
            }
        }

        public CancelFailingTask(Environment environment) throws Exception {
            super(environment);
        }

        protected void init() {
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            OneShotLatch oneShotLatch = new OneShotLatch();
            Object obj = new Object();
            LockHolder lockHolder = new LockHolder(obj, oneShotLatch);
            lockHolder.start();
            try {
                getCancelables().registerCloseable(lockHolder);
                oneShotLatch.await();
                StreamTaskTest.syncLatch.trigger();
                synchronized (obj) {
                }
                controller.allActionsCompleted();
            } finally {
                lockHolder.close();
            }
        }

        protected void cleanup() {
        }

        protected void cancelTask() throws Exception {
            throw new Exception("test exception");
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$CancelThrowingTask.class */
    public static class CancelThrowingTask extends StreamTask<String, AbstractStreamOperator<String>> {
        public CancelThrowingTask(Environment environment) throws Exception {
            super(environment);
        }

        protected void init() {
        }

        protected void processInput(MailboxDefaultAction.Controller controller) {
            throw new CancelTaskException();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$ClosingOperator.class */
    private static class ClosingOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
        static AtomicBoolean closed = new AtomicBoolean();
        static AtomicInteger notified = new AtomicInteger();

        private ClosingOperator() {
        }

        public void open() throws Exception {
            super.open();
            closed.set(false);
            notified.set(0);
        }

        public void close() throws Exception {
            super.close();
            closed.set(true);
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            super.notifyCheckpointComplete(j);
            notified.incrementAndGet();
        }

        public void processElement(StreamRecord<T> streamRecord) throws Exception {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$EmptyInputProcessor.class */
    public static class EmptyInputProcessor implements StreamInputProcessor {
        private volatile boolean isFinished;

        public EmptyInputProcessor() {
            this(true);
        }

        public EmptyInputProcessor(boolean z) {
            this.isFinished = z;
        }

        public InputStatus processInput() throws Exception {
            return this.isFinished ? InputStatus.END_OF_INPUT : InputStatus.NOTHING_AVAILABLE;
        }

        public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long j) throws CheckpointException {
            return FutureUtils.completedVoidFuture();
        }

        public void close() throws IOException {
        }

        public CompletableFuture<?> getAvailableFuture() {
            return AVAILABLE;
        }

        public void finishInput() {
            this.isFinished = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$FailOnNotifyCheckpointMapper.class */
    public static class FailOnNotifyCheckpointMapper<T> implements MapFunction<T, T>, CheckpointListener {
        private static final long serialVersionUID = 1;

        private FailOnNotifyCheckpointMapper() {
        }

        public T map(T t) throws Exception {
            return t;
        }

        public void notifyCheckpointAborted(long j) {
            throw new ExpectedTestException();
        }

        public void notifyCheckpointComplete(long j) {
            throw new ExpectedTestException();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$FailedSource.class */
    private static class FailedSource extends RichParallelSourceFunction<String> implements CheckpointedFunction {
        private static CountDownLatch runningLatch = null;
        private volatile boolean running;

        public FailedSource() {
            runningLatch = new CountDownLatch(1);
        }

        public void open(Configuration configuration) throws Exception {
            this.running = true;
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
            runningLatch.countDown();
            while (this.running) {
                try {
                    Thread.sleep(2147483647L);
                } catch (InterruptedException e) {
                }
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            if (runningLatch.getCount() == 0) {
                throw new RuntimeException("source failed");
            }
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        }

        public void awaitRunning() throws InterruptedException {
            runningLatch.await();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$FailingDummyEnvironment.class */
    private static class FailingDummyEnvironment extends DummyEnvironment {
        final RuntimeException failingCause;

        private FailingDummyEnvironment(RuntimeException runtimeException) {
            this.failingCause = runtimeException;
        }

        public void declineCheckpoint(long j, CheckpointException checkpointException) {
            throw this.failingCause;
        }

        public void failExternally(Throwable th) {
            throw this.failingCause;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$FailingTwiceOperator.class */
    private static class FailingTwiceOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        private static final long serialVersionUID = 1;

        /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$FailingTwiceOperator$DisposeException.class */
        class DisposeException extends Exception {
            public DisposeException() {
                super("Dispose Exception. This exception should be suppressed");
            }
        }

        private FailingTwiceOperator() {
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
            throw new ExpectedTestException();
        }

        public void dispose() throws Exception {
            throw new DisposeException();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$MockSourceFunction.class */
    private static class MockSourceFunction implements SourceFunction<Long> {
        private static final long serialVersionUID = 1;

        private MockSourceFunction() {
        }

        public void run(SourceFunction.SourceContext<Long> sourceContext) {
        }

        public void cancel() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$MockStreamTask.class */
    public static class MockStreamTask extends StreamTask<String, AbstractStreamOperator<String>> {
        private final OperatorChain<String, AbstractStreamOperator<String>> overrideOperatorChain;
        private int restoreInvocationCount;

        MockStreamTask(Environment environment, OperatorChain<String, AbstractStreamOperator<String>> operatorChain, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) throws Exception {
            super(environment, (TimerService) null, uncaughtExceptionHandler);
            this.restoreInvocationCount = 0;
            this.overrideOperatorChain = operatorChain;
        }

        public void restore() throws Exception {
            super.restore();
            this.restoreInvocationCount++;
        }

        protected void init() {
            ((StreamTask) this).operatorChain = this.overrideOperatorChain;
            ((StreamTask) this).mainOperator = ((StreamTask) this).operatorChain.getMainOperator();
            ((StreamTask) this).inputProcessor = new EmptyInputProcessor(false);
        }

        void finishInput() {
            Preconditions.checkState(this.inputProcessor != null, "Tried to finishInput before MockStreamTask was started");
            ((EmptyInputProcessor) this.inputProcessor).finishInput();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$NoOpStreamTask.class */
    public static class NoOpStreamTask<T, OP extends StreamOperator<T>> extends StreamTask<T, OP> {
        public NoOpStreamTask(Environment environment) throws Exception {
            super(environment);
        }

        protected void init() throws Exception {
            this.inputProcessor = new EmptyInputProcessor();
        }

        protected void cleanup() throws Exception {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$RunningTask.class */
    public static class RunningTask<T extends StreamTask<?, ?>> {
        final T streamTask;
        final CompletableFuture<Void> invocationFuture;

        RunningTask(T t, CompletableFuture<Void> completableFuture) {
            this.streamTask = t;
            this.invocationFuture = completableFuture;
        }

        void waitForTaskCompletion(boolean z) throws Exception {
            try {
                this.invocationFuture.get();
            } catch (Exception e) {
                if (!z) {
                    throw e;
                }
                Assert.assertThat(e.getCause(), Matchers.is(Matchers.instanceOf(CancelTaskException.class)));
            }
            Assert.assertThat(Boolean.valueOf(this.streamTask.isCanceled()), Matchers.is(Boolean.valueOf(z)));
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$SlowlyDeserializingOperator.class */
    private static class SlowlyDeserializingOperator extends StreamSource<Long, SourceFunction<Long>> {
        private static final long serialVersionUID = 1;
        private volatile boolean canceled;

        public SlowlyDeserializingOperator() {
            super(new MockSourceFunction());
            this.canceled = false;
        }

        public void run(Object obj, StreamStatusMaintainer streamStatusMaintainer, Output<StreamRecord<Long>> output, OperatorChain<?, ?> operatorChain) throws Exception {
            while (!this.canceled) {
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                }
            }
        }

        public void cancel() {
            this.canceled = true;
        }

        private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
            long currentTimeMillis;
            objectInputStream.defaultReadObject();
            long j = 500;
            long currentTimeMillis2 = System.currentTimeMillis() + 500;
            do {
                try {
                    Thread.sleep(j);
                } catch (InterruptedException e) {
                }
                currentTimeMillis = currentTimeMillis2 - System.currentTimeMillis();
                j = currentTimeMillis;
            } while (currentTimeMillis > 0);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$StateBackendTestSource.class */
    public static class StateBackendTestSource extends StreamTask<Long, StreamSource<Long, SourceFunction<Long>>> {
        private static volatile boolean fail;

        public StateBackendTestSource(Environment environment) throws Exception {
            super(environment);
        }

        protected void init() throws Exception {
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            if (fail) {
                throw new RuntimeException();
            }
            controller.allActionsCompleted();
        }

        protected void cleanup() throws Exception {
        }

        public StreamTaskStateInitializer createStreamTaskStateInitializer() {
            StreamTaskStateInitializer createStreamTaskStateInitializer = super.createStreamTaskStateInitializer();
            return (operatorID, str, processingTimeService, keyContext, typeSerializer, closeableRegistry, metricGroup, d, z) -> {
                final StreamOperatorStateContext streamOperatorStateContext = createStreamTaskStateInitializer.streamOperatorStateContext(operatorID, str, processingTimeService, keyContext, typeSerializer, closeableRegistry, metricGroup, d, z);
                return new StreamOperatorStateContext() { // from class: org.apache.flink.streaming.runtime.tasks.StreamTaskTest.StateBackendTestSource.1
                    public boolean isRestored() {
                        return streamOperatorStateContext.isRestored();
                    }

                    public OperatorStateBackend operatorStateBackend() {
                        return streamOperatorStateContext.operatorStateBackend();
                    }

                    public CheckpointableKeyedStateBackend<?> keyedStateBackend() {
                        return streamOperatorStateContext.keyedStateBackend();
                    }

                    public InternalTimeServiceManager<?> internalTimerServiceManager() {
                        InternalTimeServiceManager internalTimerServiceManager = streamOperatorStateContext.internalTimerServiceManager();
                        if (internalTimerServiceManager != null) {
                            return (InternalTimeServiceManager) Mockito.spy(internalTimerServiceManager);
                        }
                        return null;
                    }

                    public CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs() {
                        return replaceWithSpy(streamOperatorStateContext.rawOperatorStateInputs());
                    }

                    public CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs() {
                        return replaceWithSpy(streamOperatorStateContext.rawKeyedStateInputs());
                    }

                    public <T extends Closeable> T replaceWithSpy(T t) {
                        T t2 = (T) Mockito.spy(t);
                        if (closeableRegistry.unregisterCloseable(t)) {
                            try {
                                closeableRegistry.registerCloseable(t2);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        }
                        return t2;
                    }
                };
            };
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$TestMemoryStateBackendFactory.class */
    public static final class TestMemoryStateBackendFactory implements StateBackendFactory<AbstractStateBackend> {
        private static final long serialVersionUID = 1;

        /* renamed from: createFromConfig, reason: merged with bridge method [inline-methods] */
        public AbstractStateBackend m152createFromConfig(ReadableConfig readableConfig, ClassLoader classLoader) {
            return new TestSpyWrapperStateBackend(createInnerBackend(readableConfig));
        }

        protected MemoryStateBackend createInnerBackend(ReadableConfig readableConfig) {
            return new MemoryStateBackend();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$TestStreamSource.class */
    static class TestStreamSource<OUT, SRC extends SourceFunction<OUT>> extends StreamSource<OUT, SRC> {
        static AbstractKeyedStateBackend<?> keyedStateBackend;
        static OperatorStateBackend operatorStateBackend;
        static CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs;
        static CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs;

        public TestStreamSource(SRC src) {
            super(src);
        }

        public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
            keyedStateBackend = getKeyedStateBackend();
            operatorStateBackend = getOperatorStateBackend();
            rawOperatorStateInputs = stateInitializationContext.getRawOperatorStateInputs();
            rawKeyedStateInputs = stateInitializationContext.getRawKeyedStateInputs();
            super.initializeState(stateInitializationContext);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$TestUserCodeClassLoader.class */
    private static class TestUserCodeClassLoader extends ClassLoader {
        public TestUserCodeClassLoader() {
            super(ClassLoader.getSystemClassLoader());
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$TestingKeyedStateHandle.class */
    private static class TestingKeyedStateHandle implements KeyedStateHandle {
        private static final long serialVersionUID = -2473861305282291582L;
        private final transient CompletableFuture<Void> discardFuture;

        private TestingKeyedStateHandle() {
            this.discardFuture = new CompletableFuture<>();
        }

        public CompletableFuture<Void> getDiscardFuture() {
            return this.discardFuture;
        }

        public KeyGroupRange getKeyGroupRange() {
            return KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
        }

        /* renamed from: getIntersection, reason: merged with bridge method [inline-methods] */
        public TestingKeyedStateHandle m153getIntersection(KeyGroupRange keyGroupRange) {
            return this;
        }

        public void registerSharedStates(SharedStateRegistry sharedStateRegistry) {
        }

        public void discardState() {
            this.discardFuture.complete(null);
        }

        public long getStateSize() {
            return 0L;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$TestingOperatorStateHandle.class */
    private static class TestingOperatorStateHandle implements OperatorStateHandle {
        private static final long serialVersionUID = 923794934187614088L;
        private final transient CompletableFuture<Void> discardFuture;

        private TestingOperatorStateHandle() {
            this.discardFuture = new CompletableFuture<>();
        }

        public CompletableFuture<Void> getDiscardFuture() {
            return this.discardFuture;
        }

        public Map<String, OperatorStateHandle.StateMetaInfo> getStateNameToPartitionOffsets() {
            return Collections.emptyMap();
        }

        public FSDataInputStream openInputStream() throws IOException {
            throw new IOException("Cannot open input streams in testing implementation.");
        }

        public Optional<byte[]> asBytesIfInMemory() {
            return Optional.empty();
        }

        public StreamStateHandle getDelegateStateHandle() {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public void discardState() throws Exception {
            this.discardFuture.complete(null);
        }

        public long getStateSize() {
            return 0L;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$ThreadInspectingTask.class */
    public static class ThreadInspectingTask extends StreamTask<String, AbstractStreamOperator<String>> {
        private final long taskThreadId;
        private final ClassLoader taskClassLoader;
        private transient boolean hasTimerTriggered;

        ThreadInspectingTask(Environment environment) throws Exception {
            super(environment);
            Thread currentThread = Thread.currentThread();
            this.taskThreadId = currentThread.getId();
            this.taskClassLoader = currentThread.getContextClassLoader();
        }

        @Nullable
        ClassLoader getTaskClassLoader() {
            return this.taskClassLoader;
        }

        protected void init() throws Exception {
            checkTaskThreadInfo();
            getMainOperator().getProcessingTimeService().registerTimer(0L, new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.StreamTaskTest.ThreadInspectingTask.1
                public void onProcessingTime(long j) throws Exception {
                    ThreadInspectingTask.this.checkTaskThreadInfo();
                    ThreadInspectingTask.this.hasTimerTriggered = true;
                }
            });
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            checkTaskThreadInfo();
            if (this.hasTimerTriggered) {
                controller.allActionsCompleted();
            }
        }

        protected void cleanup() throws Exception {
            checkTaskThreadInfo();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void checkTaskThreadInfo() {
            Thread currentThread = Thread.currentThread();
            Preconditions.checkState(this.taskThreadId == currentThread.getId(), "Task's method was called in non task thread.");
            Preconditions.checkState(this.taskClassLoader == currentThread.getContextClassLoader(), "Task's controller class loader has been changed during invocation.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$UnAvailableTestInputProcessor.class */
    public static class UnAvailableTestInputProcessor implements StreamInputProcessor {
        private final AvailabilityProvider.AvailabilityHelper availabilityProvider;

        private UnAvailableTestInputProcessor() {
            this.availabilityProvider = new AvailabilityProvider.AvailabilityHelper();
        }

        public InputStatus processInput() {
            return this.availabilityProvider.isAvailable() ? InputStatus.END_OF_INPUT : InputStatus.NOTHING_AVAILABLE;
        }

        public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long j) throws CheckpointException {
            return FutureUtils.completedVoidFuture();
        }

        public void close() throws IOException {
        }

        public CompletableFuture<?> getAvailableFuture() {
            return this.availabilityProvider.getAvailableFuture();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$UnusedOperatorFactory.class */
    private static class UnusedOperatorFactory extends AbstractStreamOperatorFactory<String> {
        private UnusedOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            throw new UnsupportedOperationException("This shouldn't be called");
        }

        public void setChainingStrategy(ChainingStrategy chainingStrategy) {
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$WaitingThread.class */
    private static class WaitingThread extends Thread {
        private final MailboxExecutor executor;
        private final RunnableWithException resumeTask;
        private final long sleepTimeInsideMail;
        private final long sleepTimeOutsideMail;
        private final TimerGauge sleepOutsideMailTimer;

        @Nullable
        private Exception asyncException;

        public WaitingThread(MailboxExecutor mailboxExecutor, RunnableWithException runnableWithException, long j, long j2, TimerGauge timerGauge) {
            this.executor = mailboxExecutor;
            this.resumeTask = runnableWithException;
            this.sleepTimeInsideMail = j;
            this.sleepTimeOutsideMail = j2;
            this.sleepOutsideMailTimer = timerGauge;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!this.sleepOutsideMailTimer.isMeasuring()) {
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                    this.asyncException = e;
                }
            }
            Thread.sleep(this.sleepTimeOutsideMail);
            this.executor.submit(() -> {
                if (this.asyncException != null) {
                    throw this.asyncException;
                }
                Thread.sleep(this.sleepTimeInsideMail);
                this.resumeTask.run();
            }, "This task will complete the future to resume process input action.");
        }
    }

    @Test
    public void testSavepointSuspendCompleted() throws Exception {
        testSyncSavepointWithEndInput((v0, v1) -> {
            v0.notifyCheckpointCompleteAsync(v1);
        }, CheckpointType.SAVEPOINT_SUSPEND, false);
    }

    @Test
    public void testSavepointTerminateCompleted() throws Exception {
        testSyncSavepointWithEndInput((v0, v1) -> {
            v0.notifyCheckpointCompleteAsync(v1);
        }, CheckpointType.SAVEPOINT_TERMINATE, true);
    }

    @Test
    public void testSavepointSuspendedAborted() throws Exception {
        testSyncSavepointWithEndInput((streamTask, l) -> {
            streamTask.abortCheckpointOnBarrier(l.longValue(), new CheckpointException(CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE));
        }, CheckpointType.SAVEPOINT_SUSPEND, true);
    }

    @Test
    public void testSavepointTerminateAborted() throws Exception {
        testSyncSavepointWithEndInput((streamTask, l) -> {
            streamTask.abortCheckpointOnBarrier(l.longValue(), new CheckpointException(CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE));
        }, CheckpointType.SAVEPOINT_TERMINATE, true);
    }

    @Test
    public void testSavepointSuspendAbortedAsync() throws Exception {
        testSyncSavepointWithEndInput((v0, v1) -> {
            v0.notifyCheckpointAbortAsync(v1);
        }, CheckpointType.SAVEPOINT_SUSPEND, true);
    }

    @Test
    public void testSavepointTerminateAbortedAsync() throws Exception {
        testSyncSavepointWithEndInput((v0, v1) -> {
            v0.notifyCheckpointAbortAsync(v1);
        }, CheckpointType.SAVEPOINT_TERMINATE, true);
    }

    private void testSyncSavepointWithEndInput(BiConsumerWithException<StreamTask<?, ?>, Long, IOException> biConsumerWithException, CheckpointType checkpointType, boolean z) throws Exception {
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperator<?>) new TestBoundedOneInputStreamOperator()).build();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        MailboxExecutor createExecutor = build.streamTask.getMailboxExecutorFactory().createExecutor(Integer.MAX_VALUE);
        createExecutor.execute(() -> {
            try {
                build.streamTask.triggerCheckpointOnBarrier(new CheckpointMetaData(1L, 1L), new CheckpointOptions(checkpointType, CheckpointStorageLocationReference.getDefault()), new CheckpointMetricsBuilder());
            } catch (IOException e) {
                Assert.fail(e.getMessage());
            }
        }, "triggerCheckpointOnBarrier");
        new Thread(() -> {
            try {
                countDownLatch.await();
                build.endInput();
                countDownLatch2.countDown();
            } catch (InterruptedException e) {
                Assert.fail(e.getMessage());
            }
        }).start();
        countDownLatch.getClass();
        createExecutor.execute(countDownLatch::countDown, "savepointTriggeredLatch");
        createExecutor.execute(() -> {
            countDownLatch2.await();
            biConsumerWithException.accept(build.streamTask, 1L);
        }, "savepointResult");
        while (build.streamTask.isMailboxLoopRunning()) {
            build.streamTask.runMailboxStep();
        }
        Assert.assertEquals(Boolean.valueOf(z), Boolean.valueOf(TestBoundedOneInputStreamOperator.isInputEnded()));
    }

    @Test
    public void testCleanUpExceptionSuppressing() throws Exception {
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = oneInputStreamTaskTestHarness.getStreamConfig();
        streamConfig.setStreamOperator(new FailingTwiceOperator());
        streamConfig.setOperatorID(new OperatorID());
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("Doesn't matter", 0L));
        try {
            oneInputStreamTaskTestHarness.waitForTaskCompletion();
        } catch (Exception e) {
            if (!(e.getCause() instanceof ExpectedTestException)) {
                throw e;
            }
            if (e.getCause().getSuppressed().length != 1) {
                throw e;
            }
            if (!(e.getCause().getSuppressed()[0] instanceof FailingTwiceOperator.DisposeException)) {
                throw e;
            }
        }
    }

    @Test
    public void streamTaskAsyncExceptionHandler_handleException_forwardsMessageProperly() {
        MockEnvironment build = MockEnvironment.builder().build();
        RuntimeException runtimeException = new RuntimeException("RUNTIME EXCEPTION");
        StreamTask.StreamTaskAsyncExceptionHandler streamTaskAsyncExceptionHandler = new StreamTask.StreamTaskAsyncExceptionHandler(build);
        build.setExpectedExternalFailureCause(AsynchronousException.class);
        streamTaskAsyncExceptionHandler.handleAsyncException("EXPECTED_ERROR MESSAGE", runtimeException);
        Throwable th = (Throwable) build.getActualExternalFailureCause().orElseThrow(() -> {
            return new AssertionError("Expected exceptional completion");
        });
        Assert.assertThat(th, Matchers.instanceOf(AsynchronousException.class));
        Assert.assertThat(th.getMessage(), Matchers.is("EXPECTED_ERROR MESSAGE"));
        Assert.assertThat(th.getCause(), Matchers.is(runtimeException));
    }

    @Test
    public void testEarlyCanceling() throws Exception {
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        streamConfig.setOperatorID(new OperatorID(4711L, 42L));
        streamConfig.setStreamOperator(new SlowlyDeserializingOperator());
        streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        TaskManagerActions taskManagerActions = (TaskManagerActions) Mockito.spy(new NoOpTaskManagerActions());
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().build();
        Throwable th = null;
        try {
            try {
                Task build2 = new TestTaskBuilder(build).setInvokable(SourceStreamTask.class).setTaskConfig(streamConfig.getConfiguration()).setTaskManagerActions(taskManagerActions).build();
                TaskExecutionState taskExecutionState = new TaskExecutionState(build2.getExecutionId(), ExecutionState.RUNNING);
                build2.startTaskThread();
                ((TaskManagerActions) Mockito.verify(taskManagerActions, Mockito.timeout(2000L))).updateTaskExecutionState((TaskExecutionState) org.mockito.Matchers.eq(taskExecutionState));
                build2.cancelExecution();
                build2.getExecutingThread().join();
                Assert.assertFalse("Task did not cancel", build2.getExecutingThread().isAlive());
                Assert.assertEquals(ExecutionState.CANCELED, build2.getExecutionState());
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testStateBackendLoadingAndClosing() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(StateBackendOptions.STATE_BACKEND, TestMemoryStateBackendFactory.class.getName());
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        streamConfig.setStateKeySerializer((TypeSerializer) Mockito.mock(TypeSerializer.class));
        streamConfig.setOperatorID(new OperatorID(4711L, 42L));
        streamConfig.setStreamOperator(new TestStreamSource(new MockSourceFunction()));
        streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().build();
        Throwable th = null;
        try {
            try {
                Task createTask = createTask(StateBackendTestSource.class, build, streamConfig, configuration);
                boolean unused = StateBackendTestSource.fail = false;
                createTask.startTaskThread();
                createTask.getExecutingThread().join();
                ((OperatorStateBackend) Mockito.verify(TestStreamSource.operatorStateBackend)).close();
                ((AbstractKeyedStateBackend) Mockito.verify(TestStreamSource.keyedStateBackend)).close();
                ((CloseableIterable) Mockito.verify(TestStreamSource.rawOperatorStateInputs)).close();
                ((CloseableIterable) Mockito.verify(TestStreamSource.rawKeyedStateInputs)).close();
                ((OperatorStateBackend) Mockito.verify(TestStreamSource.operatorStateBackend)).dispose();
                ((AbstractKeyedStateBackend) Mockito.verify(TestStreamSource.keyedStateBackend)).dispose();
                Assert.assertEquals(ExecutionState.FINISHED, createTask.getExecutionState());
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testStateBackendClosingOnFailure() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(StateBackendOptions.STATE_BACKEND, TestMemoryStateBackendFactory.class.getName());
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        streamConfig.setStateKeySerializer((TypeSerializer) Mockito.mock(TypeSerializer.class));
        streamConfig.setOperatorID(new OperatorID(4711L, 42L));
        streamConfig.setStreamOperator(new TestStreamSource(new MockSourceFunction()));
        streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().build();
        Throwable th = null;
        try {
            Task createTask = createTask(StateBackendTestSource.class, build, streamConfig, configuration);
            boolean unused = StateBackendTestSource.fail = true;
            createTask.startTaskThread();
            createTask.getExecutingThread().join();
            ((OperatorStateBackend) Mockito.verify(TestStreamSource.operatorStateBackend)).close();
            ((AbstractKeyedStateBackend) Mockito.verify(TestStreamSource.keyedStateBackend)).close();
            ((CloseableIterable) Mockito.verify(TestStreamSource.rawOperatorStateInputs)).close();
            ((CloseableIterable) Mockito.verify(TestStreamSource.rawKeyedStateInputs)).close();
            ((OperatorStateBackend) Mockito.verify(TestStreamSource.operatorStateBackend)).dispose();
            ((AbstractKeyedStateBackend) Mockito.verify(TestStreamSource.keyedStateBackend)).dispose();
            Assert.assertEquals(ExecutionState.FAILED, createTask.getExecutionState());
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCanceleablesCanceledOnCancelTaskError() throws Exception {
        syncLatch = new OneShotLatch();
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().build();
        Throwable th = null;
        try {
            try {
                Task createTask = createTask(CancelFailingTask.class, build, streamConfig, new Configuration());
                createTask.startTaskThread();
                syncLatch.await();
                createTask.cancelExecution();
                createTask.getExecutingThread().join();
                Assert.assertEquals(ExecutionState.CANCELED, createTask.getExecutionState());
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCancelTaskExceptionHandling() throws Exception {
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().build();
        Throwable th = null;
        try {
            try {
                Task createTask = createTask(CancelThrowingTask.class, build, streamConfig, new Configuration());
                createTask.startTaskThread();
                createTask.getExecutingThread().join();
                Assert.assertEquals(ExecutionState.CANCELED, createTask.getExecutionState());
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testDecliningCheckpointStreamOperator() throws Exception {
        DummyEnvironment dummyEnvironment = new DummyEnvironment();
        OperatorSnapshotFutures operatorSnapshotFutures = (OperatorSnapshotFutures) Mockito.mock(OperatorSnapshotFutures.class);
        OperatorSnapshotFutures operatorSnapshotFutures2 = (OperatorSnapshotFutures) Mockito.mock(OperatorSnapshotFutures.class);
        ExpectedTestException expectedTestException = new ExpectedTestException();
        RunningTask runTask = runTask(() -> {
            return createMockStreamTask(dummyEnvironment, operatorChain(streamOperatorWithSnapshotException(expectedTestException), streamOperatorWithSnapshot(operatorSnapshotFutures), streamOperatorWithSnapshot(operatorSnapshotFutures2)));
        });
        MockStreamTask mockStreamTask = (MockStreamTask) runTask.streamTask;
        StreamTaskUtil.waitTaskIsRunning(mockStreamTask, runTask.invocationFuture);
        mockStreamTask.triggerCheckpointAsync(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation());
        try {
            runTask.waitForTaskCompletion(false);
        } catch (Exception e) {
            if (!ExceptionUtils.findThrowable(e, ExpectedTestException.class).isPresent()) {
                throw e;
            }
        }
        ((OperatorSnapshotFutures) Mockito.verify(operatorSnapshotFutures)).cancel();
        ((OperatorSnapshotFutures) Mockito.verify(operatorSnapshotFutures2)).cancel();
    }

    @Test
    public void testUncaughtExceptionInAsynchronousCheckpointingOperation() throws Exception {
        RuntimeException runtimeException = new RuntimeException("Test exception");
        FailingDummyEnvironment failingDummyEnvironment = new FailingDummyEnvironment(runtimeException);
        OperatorSnapshotFutures operatorSnapshotFutures = new OperatorSnapshotFutures(ExceptionallyDoneFuture.of(runtimeException), DoneFuture.of(SnapshotResult.empty()), DoneFuture.of(SnapshotResult.empty()), DoneFuture.of(SnapshotResult.empty()), DoneFuture.of(SnapshotResult.empty()), DoneFuture.of(SnapshotResult.empty()));
        TestingUncaughtExceptionHandler testingUncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
        RunningTask runTask = runTask(() -> {
            return new MockStreamTask(failingDummyEnvironment, operatorChain(streamOperatorWithSnapshot(operatorSnapshotFutures)), testingUncaughtExceptionHandler);
        });
        MockStreamTask mockStreamTask = (MockStreamTask) runTask.streamTask;
        StreamTaskUtil.waitTaskIsRunning(mockStreamTask, runTask.invocationFuture);
        mockStreamTask.triggerCheckpointAsync(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation());
        Assert.assertThat(testingUncaughtExceptionHandler.waitForUncaughtException(), Matchers.is(runtimeException));
        mockStreamTask.finishInput();
        runTask.waitForTaskCompletion(false);
    }

    @Test
    public void testFailingAsyncCheckpointRunnable() throws Exception {
        OperatorSnapshotFutures operatorSnapshotFutures = (OperatorSnapshotFutures) Mockito.mock(OperatorSnapshotFutures.class);
        OperatorSnapshotFutures operatorSnapshotFutures2 = (OperatorSnapshotFutures) Mockito.mock(OperatorSnapshotFutures.class);
        OperatorSnapshotFutures operatorSnapshotFutures3 = (OperatorSnapshotFutures) Mockito.mock(OperatorSnapshotFutures.class);
        RunnableFuture runnableFuture = (RunnableFuture) Mockito.mock(RunnableFuture.class);
        Mockito.when(runnableFuture.get()).thenThrow(new Throwable[]{new ExecutionException(new Exception("Test exception"))});
        Mockito.when(operatorSnapshotFutures3.getOperatorStateRawFuture()).thenReturn(runnableFuture);
        MockEnvironment build = new MockEnvironmentBuilder().build();
        Throwable th = null;
        try {
            try {
                RunningTask runTask = runTask(() -> {
                    return createMockStreamTask(build, operatorChain(streamOperatorWithSnapshot(operatorSnapshotFutures), streamOperatorWithSnapshot(operatorSnapshotFutures2), streamOperatorWithSnapshot(operatorSnapshotFutures3)));
                });
                MockStreamTask mockStreamTask = (MockStreamTask) runTask.streamTask;
                StreamTaskUtil.waitTaskIsRunning(mockStreamTask, runTask.invocationFuture);
                build.setExpectedExternalFailureCause(Throwable.class);
                mockStreamTask.triggerCheckpointAsync(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation()).get();
                ExecutorService asyncOperationsThreadPool = mockStreamTask.getAsyncOperationsThreadPool();
                asyncOperationsThreadPool.shutdown();
                if (!asyncOperationsThreadPool.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
                    Assert.fail("Executor did not shut down within the given timeout. This indicates that the checkpointing did not resume.");
                }
                Assert.assertTrue(build.getActualExternalFailureCause().isPresent());
                ((OperatorSnapshotFutures) Mockito.verify(operatorSnapshotFutures)).cancel();
                ((OperatorSnapshotFutures) Mockito.verify(operatorSnapshotFutures2)).cancel();
                ((OperatorSnapshotFutures) Mockito.verify(operatorSnapshotFutures3)).cancel();
                mockStreamTask.finishInput();
                runTask.waitForTaskCompletion(false);
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testAsyncCheckpointingConcurrentCloseAfterAcknowledge() throws Exception {
        final OneShotLatch oneShotLatch = new OneShotLatch();
        final OneShotLatch oneShotLatch2 = new OneShotLatch();
        CheckpointResponder checkpointResponder = (CheckpointResponder) Mockito.mock(CheckpointResponder.class);
        ((CheckpointResponder) Mockito.doAnswer(new Answer() { // from class: org.apache.flink.streaming.runtime.tasks.StreamTaskTest.1
            public Object answer(InvocationOnMock invocationOnMock) {
                oneShotLatch.trigger();
                while (true) {
                    try {
                        oneShotLatch2.await();
                        return null;
                    } catch (InterruptedException e) {
                    }
                }
            }
        }).when(checkpointResponder)).acknowledgeCheckpoint((JobID) org.mockito.Matchers.any(JobID.class), (ExecutionAttemptID) org.mockito.Matchers.any(ExecutionAttemptID.class), org.mockito.Matchers.anyLong(), (CheckpointMetrics) org.mockito.Matchers.any(CheckpointMetrics.class), (TaskStateSnapshot) org.mockito.Matchers.any(TaskStateSnapshot.class));
        TaskStateManagerImpl taskStateManagerImpl = new TaskStateManagerImpl(new JobID(1L, 2L), new ExecutionAttemptID(), (TaskLocalStateStore) Mockito.mock(TaskLocalStateStoreImpl.class), (JobManagerTaskRestore) null, checkpointResponder);
        KeyedStateHandle keyedStateHandle = (KeyedStateHandle) Mockito.mock(KeyedStateHandle.class);
        KeyedStateHandle keyedStateHandle2 = (KeyedStateHandle) Mockito.mock(KeyedStateHandle.class);
        OperatorStateHandle operatorStateHandle = (OperatorStateHandle) Mockito.mock(OperatorStreamStateHandle.class);
        OperatorStateHandle operatorStateHandle2 = (OperatorStateHandle) Mockito.mock(OperatorStreamStateHandle.class);
        OperatorSnapshotFutures operatorSnapshotFutures = new OperatorSnapshotFutures(DoneFuture.of(SnapshotResult.of(keyedStateHandle)), DoneFuture.of(SnapshotResult.of(keyedStateHandle2)), DoneFuture.of(SnapshotResult.of(operatorStateHandle)), DoneFuture.of(SnapshotResult.of(operatorStateHandle2)), DoneFuture.of(SnapshotResult.empty()), DoneFuture.of(SnapshotResult.empty()));
        MockEnvironment build = new MockEnvironmentBuilder().setTaskName("mock-task").setTaskStateManager(taskStateManagerImpl).build();
        Throwable th = null;
        try {
            RunningTask runTask = runTask(() -> {
                return createMockStreamTask(build, operatorChain(streamOperatorWithSnapshot(operatorSnapshotFutures)));
            });
            MockStreamTask mockStreamTask = (MockStreamTask) runTask.streamTask;
            StreamTaskUtil.waitTaskIsRunning(mockStreamTask, runTask.invocationFuture);
            mockStreamTask.triggerCheckpointAsync(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation());
            oneShotLatch.await();
            ArgumentCaptor forClass = ArgumentCaptor.forClass(TaskStateSnapshot.class);
            ((CheckpointResponder) Mockito.verify(checkpointResponder)).acknowledgeCheckpoint((JobID) org.mockito.Matchers.any(JobID.class), (ExecutionAttemptID) org.mockito.Matchers.any(ExecutionAttemptID.class), org.mockito.Matchers.eq(42L), (CheckpointMetrics) org.mockito.Matchers.any(CheckpointMetrics.class), (TaskStateSnapshot) forClass.capture());
            OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) ((Map.Entry) ((TaskStateSnapshot) forClass.getValue()).getSubtaskStateMappings().iterator().next()).getValue();
            Assert.assertEquals(StateObjectCollection.singleton(keyedStateHandle), operatorSubtaskState.getManagedKeyedState());
            Assert.assertEquals(StateObjectCollection.singleton(keyedStateHandle2), operatorSubtaskState.getRawKeyedState());
            Assert.assertEquals(StateObjectCollection.singleton(operatorStateHandle), operatorSubtaskState.getManagedOperatorState());
            Assert.assertEquals(StateObjectCollection.singleton(operatorStateHandle2), operatorSubtaskState.getRawOperatorState());
            ((KeyedStateHandle) Mockito.verify(keyedStateHandle, Mockito.never())).discardState();
            ((KeyedStateHandle) Mockito.verify(keyedStateHandle2, Mockito.never())).discardState();
            ((OperatorStateHandle) Mockito.verify(operatorStateHandle, Mockito.never())).discardState();
            ((OperatorStateHandle) Mockito.verify(operatorStateHandle2, Mockito.never())).discardState();
            mockStreamTask.cancel();
            oneShotLatch2.trigger();
            ((KeyedStateHandle) Mockito.verify(keyedStateHandle, Mockito.never())).discardState();
            ((KeyedStateHandle) Mockito.verify(keyedStateHandle2, Mockito.never())).discardState();
            ((OperatorStateHandle) Mockito.verify(operatorStateHandle, Mockito.never())).discardState();
            ((OperatorStateHandle) Mockito.verify(operatorStateHandle2, Mockito.never())).discardState();
            runTask.waitForTaskCompletion(true);
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAsyncCheckpointingConcurrentCloseBeforeAcknowledge() throws Exception {
        TestingKeyedStateHandle testingKeyedStateHandle = new TestingKeyedStateHandle();
        TestingKeyedStateHandle testingKeyedStateHandle2 = new TestingKeyedStateHandle();
        TestingOperatorStateHandle testingOperatorStateHandle = new TestingOperatorStateHandle();
        TestingOperatorStateHandle testingOperatorStateHandle2 = new TestingOperatorStateHandle();
        BlockingRunnableFuture blockingRunnableFuture = new BlockingRunnableFuture(2, SnapshotResult.of(testingKeyedStateHandle2));
        OneInputStreamOperator streamOperatorWithSnapshot = streamOperatorWithSnapshot(new OperatorSnapshotFutures(DoneFuture.of(SnapshotResult.of(testingKeyedStateHandle)), blockingRunnableFuture, DoneFuture.of(SnapshotResult.of(testingOperatorStateHandle)), DoneFuture.of(SnapshotResult.of(testingOperatorStateHandle2)), DoneFuture.of(SnapshotResult.empty()), DoneFuture.of(SnapshotResult.empty())));
        AcknowledgeDummyEnvironment acknowledgeDummyEnvironment = new AcknowledgeDummyEnvironment();
        RunningTask runTask = runTask(() -> {
            return createMockStreamTask(acknowledgeDummyEnvironment, operatorChain(streamOperatorWithSnapshot));
        });
        StreamTaskUtil.waitTaskIsRunning(runTask.streamTask, runTask.invocationFuture);
        ((MockStreamTask) runTask.streamTask).triggerCheckpointAsync(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation());
        blockingRunnableFuture.awaitRun();
        ((MockStreamTask) runTask.streamTask).cancel();
        FutureUtils.waitForAll(Arrays.asList(testingKeyedStateHandle.getDiscardFuture(), testingKeyedStateHandle2.getDiscardFuture(), testingOperatorStateHandle.getDiscardFuture(), testingOperatorStateHandle2.getDiscardFuture())).get();
        try {
            acknowledgeDummyEnvironment.getAcknowledgeCheckpointFuture().get(10L, TimeUnit.MILLISECONDS);
            Assert.fail("The checkpoint should not get acknowledged.");
        } catch (TimeoutException e) {
        }
        runTask.waitForTaskCompletion(true);
    }

    @Test
    public void testEmptySubtaskStateLeadsToStatelessAcknowledgment() throws Exception {
        final OneShotLatch oneShotLatch = new OneShotLatch();
        final ArrayList arrayList = new ArrayList(1);
        CheckpointResponder checkpointResponder = (CheckpointResponder) Mockito.mock(CheckpointResponder.class);
        ((CheckpointResponder) Mockito.doAnswer(new Answer() { // from class: org.apache.flink.streaming.runtime.tasks.StreamTaskTest.2
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                arrayList.add((SubtaskState) invocationOnMock.getArgument(4));
                oneShotLatch.trigger();
                return null;
            }
        }).when(checkpointResponder)).acknowledgeCheckpoint((JobID) org.mockito.Matchers.any(JobID.class), (ExecutionAttemptID) org.mockito.Matchers.any(ExecutionAttemptID.class), org.mockito.Matchers.anyLong(), (CheckpointMetrics) org.mockito.Matchers.any(CheckpointMetrics.class), (TaskStateSnapshot) ArgumentMatchers.nullable(TaskStateSnapshot.class));
        TaskStateManagerImpl taskStateManagerImpl = new TaskStateManagerImpl(new JobID(1L, 2L), new ExecutionAttemptID(), (TaskLocalStateStore) Mockito.mock(TaskLocalStateStoreImpl.class), (JobManagerTaskRestore) null, checkpointResponder);
        OneInputStreamOperator streamOperatorWithSnapshot = streamOperatorWithSnapshot(new OperatorSnapshotFutures());
        MockEnvironment build = new MockEnvironmentBuilder().setTaskStateManager(taskStateManagerImpl).build();
        Throwable th = null;
        try {
            try {
                RunningTask runTask = runTask(() -> {
                    return createMockStreamTask(build, operatorChain(streamOperatorWithSnapshot));
                });
                StreamTaskUtil.waitTaskIsRunning(runTask.streamTask, runTask.invocationFuture);
                ((MockStreamTask) runTask.streamTask).triggerCheckpointAsync(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation());
                oneShotLatch.await(30L, TimeUnit.SECONDS);
                Assert.assertNull(arrayList.get(0));
                ((MockStreamTask) runTask.streamTask).cancel();
                runTask.waitForTaskCompletion(true);
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testOperatorClosingBeforeStopRunning() throws Throwable {
        BlockingCloseStreamOperator.resetLatches();
        Configuration configuration = new Configuration();
        StreamConfig streamConfig = new StreamConfig(configuration);
        streamConfig.setStreamOperator(new BlockingCloseStreamOperator());
        streamConfig.setOperatorID(new OperatorID());
        MockEnvironment build = new MockEnvironmentBuilder().setTaskName("Test Task").setManagedMemorySize(32768L).setInputSplitProvider(new MockInputSplitProvider()).setBufferSize(1).setTaskConfiguration(configuration).build();
        Throwable th = null;
        try {
            try {
                RunningTask runTask = runTask(() -> {
                    return new NoOpStreamTask(build);
                });
                BlockingCloseStreamOperator.inClose.await();
                Assert.assertTrue(runTask.streamTask.isRunning());
                BlockingCloseStreamOperator.finishClose.trigger();
                runTask.waitForTaskCompletion(false);
                Assert.assertFalse(runTask.streamTask.isRunning());
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testNotifyCheckpointOnClosedOperator() throws Throwable {
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).setupOutputForSingletonOperatorChain(new ClosingOperator<>()).build();
        build.setAutoProcess(false);
        build.processElement(new StreamRecord(1));
        build.streamTask.runMailboxStep();
        build.streamTask.notifyCheckpointCompleteAsync(1L);
        build.streamTask.runMailboxStep();
        Assert.assertEquals(1L, ClosingOperator.notified.get());
        Assert.assertEquals(false, Boolean.valueOf(ClosingOperator.closed.get()));
        build.streamTask.operatorChain.closeOperators(build.streamTask.getActionExecutor());
        build.streamTask.notifyCheckpointCompleteAsync(2L);
        build.streamTask.runMailboxStep();
        Assert.assertEquals(1L, ClosingOperator.notified.get());
        Assert.assertEquals(true, Boolean.valueOf(ClosingOperator.closed.get()));
    }

    @Test
    public void testFailToConfirmCheckpointCompleted() throws Exception {
        testFailToConfirmCheckpointMessage(streamTask -> {
            streamTask.notifyCheckpointCompleteAsync(1L);
        });
    }

    @Test
    public void testFailToConfirmCheckpointAborted() throws Exception {
        testFailToConfirmCheckpointMessage(streamTask -> {
            streamTask.notifyCheckpointAbortAsync(1L);
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void testFailToConfirmCheckpointMessage(Consumer<StreamTask<?, ?>> consumer) throws Exception {
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).setupOutputForSingletonOperatorChain(new StreamMap<>(new FailOnNotifyCheckpointMapper())).build();
        try {
            consumer.accept(build.streamTask);
            build.streamTask.runMailboxLoop();
            Assert.fail();
        } catch (ExpectedTestException e) {
        }
    }

    @Test
    public void testCheckpointDeclinedOnClosedOperator() throws Throwable {
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).setupOutputForSingletonOperatorChain(new ClosingOperator<>()).build();
        build.setAutoProcess(false);
        build.processElement(new StreamRecord(1));
        build.streamTask.operatorChain.closeOperators(build.streamTask.getActionExecutor());
        Assert.assertEquals(true, Boolean.valueOf(ClosingOperator.closed.get()));
        build.streamTask.triggerCheckpointOnBarrier(new CheckpointMetaData(1L, 0L), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder());
        Assert.assertEquals(1L, build.getCheckpointResponder().getDeclineReports().size());
    }

    @Test
    public void testExecuteMailboxActionsAfterLeavingInputProcessorMailboxLoop() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        MockEnvironment build = new MockEnvironmentBuilder().build();
        Throwable th = null;
        try {
            try {
                RunningTask runTask = runTask(() -> {
                    return new StreamTask<Object, StreamOperator<Object>>(build) { // from class: org.apache.flink.streaming.runtime.tasks.StreamTaskTest.3
                        protected void init() throws Exception {
                        }

                        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
                            MailboxExecutor mailboxExecutor = this.mailboxProcessor.getMailboxExecutor(0);
                            OneShotLatch oneShotLatch2 = oneShotLatch;
                            oneShotLatch2.getClass();
                            mailboxExecutor.execute(oneShotLatch2::trigger, "trigger");
                            controller.allActionsCompleted();
                        }
                    };
                });
                oneShotLatch.await();
                runTask.waitForTaskCompletion(false);
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testThreadInvariants() throws Throwable {
        Configuration configuration = new Configuration();
        StreamConfig streamConfig = new StreamConfig(configuration);
        streamConfig.setStreamOperator(new StreamMap(obj -> {
            return obj;
        }));
        streamConfig.setOperatorID(new OperatorID());
        MockEnvironment build = new MockEnvironmentBuilder().setTaskConfiguration(configuration).build();
        Throwable th = null;
        try {
            try {
                TestUserCodeClassLoader testUserCodeClassLoader = new TestUserCodeClassLoader();
                RunningTask runTask = runTask(() -> {
                    Thread.currentThread().setContextClassLoader(testUserCodeClassLoader);
                    return new ThreadInspectingTask(build);
                });
                runTask.invocationFuture.get();
                Assert.assertThat(((ThreadInspectingTask) runTask.streamTask).getTaskClassLoader(), Matchers.is(Matchers.sameInstance(testUserCodeClassLoader)));
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testRecordWriterClosedOnStreamOperatorFactoryDeserializationError() throws Exception {
        Configuration configuration = new Configuration();
        StreamConfig streamConfig = new StreamConfig(configuration);
        streamConfig.setStreamOperatorFactory(new UnusedOperatorFactory());
        StreamConfigChainer streamConfigChainer = new StreamConfigChainer(new OperatorID(42L, 42L), streamConfig, this);
        streamConfigChainer.chain(new OperatorID(44L, 44L), (StreamOperatorFactory) new UnusedOperatorFactory(), (TypeSerializer) StringSerializer.INSTANCE, (TypeSerializer) StringSerializer.INSTANCE, false);
        streamConfigChainer.finish();
        configuration.setBytes("serializedUDF", new byte[42]);
        MockEnvironment build = new MockEnvironmentBuilder().setTaskConfiguration(configuration).build();
        Throwable th = null;
        try {
            try {
                build.addOutput(new ArrayList());
                try {
                    new NoOpStreamTask(build).invoke();
                    Assert.fail("Should have failed with an exception!");
                } catch (Exception e) {
                    if (!ExceptionUtils.findThrowable(e, StreamCorruptedException.class).isPresent()) {
                        throw e;
                    }
                }
                if (build != null) {
                    if (0 != 0) {
                        try {
                            build.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        build.close();
                    }
                }
                Assert.assertTrue("OutputFlusher thread is still running", Thread.getAllStackTraces().keySet().stream().noneMatch(thread -> {
                    return thread.getName().startsWith("OutputFlusher");
                }));
            } finally {
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testProcessWithAvailableOutput() throws Exception {
        MockEnvironment mockEnvironment = setupEnvironment(true, true);
        Throwable th = null;
        try {
            new MockStreamTaskBuilder(mockEnvironment).setStreamInputProcessor(new AvailabilityTestInputProcessor(10)).build().invoke();
            Assert.assertEquals(10L, r0.currentNumProcessCalls);
            if (mockEnvironment != null) {
                if (0 == 0) {
                    mockEnvironment.close();
                    return;
                }
                try {
                    mockEnvironment.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (mockEnvironment != null) {
                if (0 != 0) {
                    try {
                        mockEnvironment.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    mockEnvironment.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testProcessWithUnAvailableOutput() throws Exception {
        WaitingThread waitingThread = null;
        try {
            MockEnvironment mockEnvironment = setupEnvironment(true, false);
            Throwable th = null;
            try {
                try {
                    AvailabilityTestInputProcessor availabilityTestInputProcessor = new AvailabilityTestInputProcessor(10);
                    org.apache.flink.streaming.util.MockStreamTask build = new MockStreamTaskBuilder(mockEnvironment).setStreamInputProcessor(availabilityTestInputProcessor).build();
                    MailboxExecutor mainMailboxExecutor = ((StreamTask) build).mailboxProcessor.getMainMailboxExecutor();
                    TaskIOMetricGroup iOMetricGroup = build.getEnvironment().getMetricGroup().getIOMetricGroup();
                    waitingThread = new WaitingThread(mainMailboxExecutor, () -> {
                        Assert.assertEquals(1L, availabilityTestInputProcessor.currentNumProcessCalls);
                        Assert.assertTrue(build.mailboxProcessor.isDefaultActionUnavailable());
                        mockEnvironment.getWriter(1).getAvailableFuture().complete(null);
                    }, 44L, 42L, iOMetricGroup.getBackPressuredTimePerSecond());
                    waitingThread.getClass();
                    mainMailboxExecutor.submit(waitingThread::start, "This task will submit another task to execute after processing input once.");
                    long currentTimeMillis = System.currentTimeMillis();
                    build.invoke();
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    Assert.assertThat(Long.valueOf(iOMetricGroup.getBackPressuredTimePerSecond().getCount()), Matchers.greaterThanOrEqualTo(42L));
                    Assert.assertThat(Long.valueOf(iOMetricGroup.getBackPressuredTimePerSecond().getCount()), Matchers.lessThanOrEqualTo(Long.valueOf(currentTimeMillis2 - 44)));
                    Assert.assertThat(Long.valueOf(iOMetricGroup.getIdleTimeMsPerSecond().getCount()), Matchers.is(0L));
                    Assert.assertEquals(10L, availabilityTestInputProcessor.currentNumProcessCalls);
                    if (mockEnvironment != null) {
                        if (0 != 0) {
                            try {
                                mockEnvironment.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            mockEnvironment.close();
                        }
                    }
                    if (waitingThread != null) {
                        waitingThread.join();
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th4) {
            if (waitingThread != null) {
                waitingThread.join();
            }
            throw th4;
        }
    }

    @Test
    public void testProcessWithUnAvailableInput() throws Exception {
        WaitingThread waitingThread = null;
        try {
            MockEnvironment mockEnvironment = setupEnvironment(true, true);
            Throwable th = null;
            try {
                try {
                    UnAvailableTestInputProcessor unAvailableTestInputProcessor = new UnAvailableTestInputProcessor();
                    org.apache.flink.streaming.util.MockStreamTask build = new MockStreamTaskBuilder(mockEnvironment).setStreamInputProcessor(unAvailableTestInputProcessor).build();
                    TaskIOMetricGroup iOMetricGroup = build.getEnvironment().getMetricGroup().getIOMetricGroup();
                    MailboxExecutor mainMailboxExecutor = ((StreamTask) build).mailboxProcessor.getMainMailboxExecutor();
                    waitingThread = new WaitingThread(mainMailboxExecutor, () -> {
                        unAvailableTestInputProcessor.availabilityProvider.getUnavailableToResetAvailable().complete(null);
                    }, 44L, 42L, iOMetricGroup.getIdleTimeMsPerSecond());
                    waitingThread.getClass();
                    mainMailboxExecutor.submit(waitingThread::start, "Start WaitingThread after Task starts processing input.");
                    long currentTimeMillis = System.currentTimeMillis();
                    build.invoke();
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    Assert.assertThat(Long.valueOf(iOMetricGroup.getIdleTimeMsPerSecond().getCount()), Matchers.greaterThanOrEqualTo(42L));
                    Assert.assertThat(Long.valueOf(iOMetricGroup.getIdleTimeMsPerSecond().getCount()), Matchers.lessThanOrEqualTo(Long.valueOf(currentTimeMillis2 - 44)));
                    Assert.assertThat(Long.valueOf(iOMetricGroup.getBackPressuredTimePerSecond().getCount()), Matchers.is(0L));
                    if (mockEnvironment != null) {
                        if (0 != 0) {
                            try {
                                mockEnvironment.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            mockEnvironment.close();
                        }
                    }
                    if (waitingThread != null) {
                        waitingThread.join();
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th4) {
            if (waitingThread != null) {
                waitingThread.join();
            }
            throw th4;
        }
    }

    @Test
    public void testRestorePerformedOnlyOnce() throws Exception {
        OneInputStreamOperator streamOperatorWithSnapshot = streamOperatorWithSnapshot(new OperatorSnapshotFutures());
        DummyEnvironment dummyEnvironment = new DummyEnvironment();
        RunningTask runTask = runTask(() -> {
            MockStreamTask createMockStreamTask = createMockStreamTask(dummyEnvironment, operatorChain(streamOperatorWithSnapshot));
            createMockStreamTask.restore();
            return createMockStreamTask;
        });
        StreamTaskUtil.waitTaskIsRunning(runTask.streamTask, runTask.invocationFuture);
        ((MockStreamTask) runTask.streamTask).cancel();
        Assert.assertThat(Integer.valueOf(((MockStreamTask) runTask.streamTask).restoreInvocationCount), Matchers.is(1));
    }

    @Test
    public void testRestorePerformedFromInvoke() throws Exception {
        OneInputStreamOperator streamOperatorWithSnapshot = streamOperatorWithSnapshot(new OperatorSnapshotFutures());
        DummyEnvironment dummyEnvironment = new DummyEnvironment();
        RunningTask runTask = runTask(() -> {
            return createMockStreamTask(dummyEnvironment, operatorChain(streamOperatorWithSnapshot));
        });
        StreamTaskUtil.waitTaskIsRunning(runTask.streamTask, runTask.invocationFuture);
        ((MockStreamTask) runTask.streamTask).cancel();
        Assert.assertThat(Integer.valueOf(((MockStreamTask) runTask.streamTask).restoreInvocationCount), Matchers.is(1));
    }

    @Test
    public void testTaskAvoidHangingAfterSnapshotStateThrownException() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(StateBackendOptions.STATE_BACKEND, TestMemoryStateBackendFactory.class.getName());
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        streamConfig.setStateKeySerializer((TypeSerializer) Mockito.mock(TypeSerializer.class));
        streamConfig.setOperatorID(new OperatorID(4712L, 43L));
        FailedSource failedSource = new FailedSource();
        streamConfig.setStreamOperator(new TestStreamSource(failedSource));
        streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().build();
        Throwable th = null;
        try {
            try {
                Task createTask = createTask(SourceStreamTask.class, build, streamConfig, configuration);
                createTask.startTaskThread();
                failedSource.awaitRunning();
                createTask.triggerCheckpointBarrier(42L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation());
                createTask.getExecutingThread().join();
                Assert.assertEquals(ExecutionState.FAILED, createTask.getExecutionState());
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    private MockEnvironment setupEnvironment(boolean... zArr) {
        Configuration configuration = new Configuration();
        new MockStreamConfig(configuration, zArr.length);
        ArrayList arrayList = new ArrayList(zArr.length);
        for (boolean z : zArr) {
            arrayList.add(new AvailabilityTestResultPartitionWriter(z));
        }
        MockEnvironment build = new MockEnvironmentBuilder().setTaskConfiguration(configuration).build();
        build.addOutputs(arrayList);
        return build;
    }

    private static <T> OneInputStreamOperator<T, T> streamOperatorWithSnapshot(OperatorSnapshotFutures operatorSnapshotFutures) throws Exception {
        OneInputStreamOperator<T, T> oneInputStreamOperator = (OneInputStreamOperator) Mockito.mock(OneInputStreamOperator.class);
        Mockito.when(oneInputStreamOperator.getOperatorID()).thenReturn(new OperatorID());
        Mockito.when(oneInputStreamOperator.snapshotState(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyLong(), (CheckpointOptions) org.mockito.Matchers.any(CheckpointOptions.class), (CheckpointStreamFactory) org.mockito.Matchers.any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotFutures);
        return oneInputStreamOperator;
    }

    private static <T> OneInputStreamOperator<T, T> streamOperatorWithSnapshotException(Exception exc) throws Exception {
        OneInputStreamOperator<T, T> oneInputStreamOperator = (OneInputStreamOperator) Mockito.mock(OneInputStreamOperator.class);
        Mockito.when(oneInputStreamOperator.getOperatorID()).thenReturn(new OperatorID());
        Mockito.when(oneInputStreamOperator.snapshotState(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyLong(), (CheckpointOptions) org.mockito.Matchers.any(CheckpointOptions.class), (CheckpointStreamFactory) org.mockito.Matchers.any(CheckpointStreamFactory.class))).thenThrow(new Throwable[]{exc});
        return oneInputStreamOperator;
    }

    private static <T> OperatorChain<T, AbstractStreamOperator<T>> operatorChain(OneInputStreamOperator<T, T>... oneInputStreamOperatorArr) throws Exception {
        return OperatorChainTest.setupOperatorChain(oneInputStreamOperatorArr);
    }

    private static <T extends StreamTask<?, ?>> RunningTask<T> runTask(SupplierWithException<T, Exception> supplierWithException) throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        return new RunningTask<>((StreamTask) completableFuture.get(), CompletableFuture.runAsync(() -> {
            try {
                StreamTask streamTask = (StreamTask) supplierWithException.get();
                completableFuture.complete(streamTask);
                try {
                    streamTask.invoke();
                } catch (RuntimeException e) {
                    throw e;
                } catch (Exception e2) {
                    throw new RuntimeException(e2);
                }
            } catch (Exception e3) {
                completableFuture.completeExceptionally(e3);
            }
        }, Executors.newSingleThreadExecutor()));
    }

    public static Task createTask(Class<? extends AbstractInvokable> cls, ShuffleEnvironment shuffleEnvironment, StreamConfig streamConfig, Configuration configuration) throws Exception {
        return new TestTaskBuilder(shuffleEnvironment).setTaskManagerConfig(configuration).setInvokable(cls).setTaskConfig(streamConfig.getConfiguration()).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static MockStreamTask createMockStreamTask(Environment environment, OperatorChain<String, AbstractStreamOperator<String>> operatorChain) throws Exception {
        return new MockStreamTask(environment, operatorChain, FatalExitExceptionHandler.INSTANCE);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1076719040:
                if (implMethodName.equals("lambda$testThreadInvariants$e0defa2f$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/runtime/tasks/StreamTaskTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    return obj -> {
                        return obj;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
