package org.apache.flink.streaming.runtime.tasks;

import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionWithException;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskITCase.class */
public class StreamTaskITCase extends TestLogger {

    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskITCase$NoOpStreamTask.class */
    public static class NoOpStreamTask<T, OP extends StreamOperator<T>> extends StreamTask<T, OP> {
        public NoOpStreamTask(Environment environment) throws Exception {
            super(environment);
        }

        protected void init() throws Exception {
            this.inputProcessor = new StreamTaskTest.EmptyInputProcessor();
        }

        protected void cleanUpInternal() throws Exception {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskITCase$UnusedOperatorFactory.class */
    public static class UnusedOperatorFactory extends AbstractStreamOperatorFactory<String> {
        private UnusedOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            throw new UnsupportedOperationException("This shouldn't be called");
        }

        public void setChainingStrategy(ChainingStrategy chainingStrategy) {
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            throw new UnsupportedOperationException();
        }
    }

    @Test
    public void testRecordWriterClosedOnTransitDeployingStateError() throws Exception {
        testRecordWriterClosedOnTransitStateError(ExecutionState.DEPLOYING);
    }

    @Test
    public void testRecordWriterClosedOnTransitInitializingStateError() throws Exception {
        testRecordWriterClosedOnTransitStateError(ExecutionState.INITIALIZING);
    }

    @Test
    public void testRecordWriterClosedOnTransitRunningStateError() throws Exception {
        testRecordWriterClosedOnTransitStateError(ExecutionState.RUNNING);
    }

    private void testRecordWriterClosedOnTransitStateError(final ExecutionState executionState) throws Exception {
        NoOpTaskManagerActions noOpTaskManagerActions = new NoOpTaskManagerActions() { // from class: org.apache.flink.streaming.runtime.tasks.StreamTaskITCase.1
            @Override // org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions
            public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
                if (taskExecutionState.getExecutionState() == executionState) {
                    throw new ExpectedTestException();
                }
            }
        };
        testRecordWriterClosedOnError(nettyShuffleEnvironment -> {
            return taskBuilderWithConfiguredRecordWriter(nettyShuffleEnvironment).setTaskManagerActions(noOpTaskManagerActions).build(EXECUTOR_RESOURCE.getExecutor());
        });
    }

    @Test
    public void testFailInEndOfConstructor() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(TaskManagerOptions.BUFFER_DEBLOAT_PERIOD.key(), "a");
        testRecordWriterClosedOnError(nettyShuffleEnvironment -> {
            return taskBuilderWithConfiguredRecordWriter(nettyShuffleEnvironment).setTaskManagerConfig(configuration).build(EXECUTOR_RESOURCE.getExecutor());
        });
    }

    private void testRecordWriterClosedOnError(FunctionWithException<NettyShuffleEnvironment, Task, Exception> functionWithException) throws Exception {
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().build();
        Throwable th = null;
        try {
            try {
                Task task = (Task) functionWithException.apply(build);
                task.startTaskThread();
                task.getExecutingThread().join();
                Assert.assertEquals(ExecutionState.FAILED, task.getExecutionState());
                Iterator<Thread> it = Thread.getAllStackTraces().keySet().iterator();
                while (it.hasNext()) {
                    MatcherAssert.assertThat(it.next().getName(), CoreMatchers.is(CoreMatchers.not(CoreMatchers.containsString("OutputFlusher"))));
                }
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    private TestTaskBuilder taskBuilderWithConfiguredRecordWriter(NettyShuffleEnvironment nettyShuffleEnvironment) {
        Configuration configuration = new Configuration();
        outputEdgeConfiguration(configuration);
        return new TestTaskBuilder(nettyShuffleEnvironment).setInvokable(NoOpStreamTask.class).setTaskConfig(configuration).setResultPartitions(Collections.singletonList(new ResultPartitionDeploymentDescriptor(PartitionDescriptorBuilder.newBuilder().build(), NettyShuffleDescriptorBuilder.newBuilder().buildLocal(), 1)));
    }

    private void outputEdgeConfiguration(Configuration configuration) {
        StreamConfig streamConfig = new StreamConfig(configuration);
        streamConfig.setStreamOperatorFactory(new UnusedOperatorFactory());
        StreamConfigChainer streamConfigChainer = new StreamConfigChainer(new OperatorID(42L, 42L), streamConfig, this, 1);
        streamConfigChainer.setBufferTimeout(1);
        streamConfigChainer.chain(new OperatorID(44L, 44L), (StreamOperatorFactory) new UnusedOperatorFactory(), (TypeSerializer) StringSerializer.INSTANCE, (TypeSerializer) StringSerializer.INSTANCE, false);
        streamConfigChainer.finish();
    }
}
