package org.apache.flink.runtime.operators.lifecycle;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommand;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher;
import org.apache.flink.runtime.operators.lifecycle.event.CheckpointCompletedEvent;
import org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders;
import org.apache.flink.runtime.operators.lifecycle.validation.DrainingValidator;
import org.apache.flink.runtime.operators.lifecycle.validation.FinishingValidator;
import org.apache.flink.runtime.operators.lifecycle.validation.TestJobDataFlowValidator;
import org.apache.flink.runtime.operators.lifecycle.validation.TestOperatorLifecycleValidator;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/runtime/operators/lifecycle/PartiallyFinishedSourcesITCase.class */
public class PartiallyFinishedSourcesITCase extends TestLogger {

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();
    private MiniClusterWithClientResource miniClusterResource;

    @Parameterized.Parameter(0)
    public TestJobBuilders.TestingGraphBuilder graphBuilder;

    @Parameterized.Parameter(1)
    public TestCommandDispatcher.TestCommandScope subtaskScope;

    @Parameterized.Parameter(2)
    public boolean failover;

    @Parameterized.Parameter(3)
    public String failoverStrategy;

    @Before
    public void init() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, this.failoverStrategy);
        configuration.setString(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, "filesystem");
        configuration.setString(FsStateChangelogOptions.BASE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
        configuration.set(FsStateChangelogOptions.RETRY_MAX_ATTEMPTS, 10);
        configuration.set(FsStateChangelogOptions.UPLOAD_TIMEOUT, Duration.ofMinutes(1L));
        this.miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());
        this.miniClusterResource.before();
    }

    @After
    public void tearDown() {
        if (this.miniClusterResource != null) {
            this.miniClusterResource.after();
        }
    }

    @Test
    public void test() throws Exception {
        TestJobWithDescription buildJob = buildJob();
        Iterator<String> it = buildJob.sources.iterator();
        String next = it.next();
        TestJobExecutor waitForEvent = TestJobExecutor.execute(buildJob, this.miniClusterResource).waitForEvent(CheckpointCompletedEvent.class).sendOperatorCommand(next, TestCommand.FINISH_SOURCES, this.subtaskScope).waitForSubtasksToFinish(findJobVertexID(buildJob, next), this.subtaskScope).waitForEvent(CheckpointCompletedEvent.class).waitForEvent(CheckpointCompletedEvent.class);
        if (this.failover) {
            waitForEvent.triggerFailover(this.subtaskScope == TestCommandDispatcher.TestCommandScope.ALL_SUBTASKS ? it.next() : next);
        }
        waitForEvent.sendBroadcastCommand(TestCommand.FINISH_SOURCES, TestCommandDispatcher.TestCommandScope.ALL_SUBTASKS).waitForTermination().assertFinishedSuccessfully();
        TestOperatorLifecycleValidator.checkOperatorsLifecycle(buildJob, new DrainingValidator(), new FinishingValidator());
        TestJobDataFlowValidator.checkDataFlow(buildJob, true);
    }

    private TestJobWithDescription buildJob() throws Exception {
        return this.graphBuilder.build(this.sharedObjects, configuration -> {
        }, streamExecutionEnvironment -> {
            streamExecutionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
            streamExecutionEnvironment.getCheckpointConfig().setCheckpointTimeout(30000L);
            streamExecutionEnvironment.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
            streamExecutionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
            streamExecutionEnvironment.getCheckpointConfig().setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
        });
    }

    private JobVertexID findJobVertexID(TestJobWithDescription testJobWithDescription, String str) {
        return ((JobVertex) StreamSupport.stream(testJobWithDescription.jobGraph.getVertices().spliterator(), false).filter(jobVertex -> {
            return jobVertex.getOperatorIDs().stream().anyMatch(operatorIDPair -> {
                return matches(operatorIDPair, str);
            });
        }).findAny().orElseThrow(() -> {
            return new RuntimeException("Vertex not found: " + str);
        })).getID();
    }

    private boolean matches(OperatorIDPair operatorIDPair, String str) {
        return ((OperatorID) operatorIDPair.getUserDefinedOperatorID().orElse(operatorIDPair.getGeneratedOperatorID())).toString().equals(str);
    }

    @Parameterized.Parameters(name = "{0} {1}, failover: {2}, strategy: {3}")
    public static List<Object[]> parameters() {
        List<String> asList = Arrays.asList("full", "region");
        List asList2 = Arrays.asList(Arrays.asList(TestJobBuilders.SIMPLE_GRAPH_BUILDER, TestCommandDispatcher.TestCommandScope.SINGLE_SUBTASK, true), Arrays.asList(TestJobBuilders.COMPLEX_GRAPH_BUILDER, TestCommandDispatcher.TestCommandScope.SINGLE_SUBTASK, true), Arrays.asList(TestJobBuilders.COMPLEX_GRAPH_BUILDER, TestCommandDispatcher.TestCommandScope.ALL_SUBTASKS, true), Arrays.asList(TestJobBuilders.SIMPLE_GRAPH_BUILDER, TestCommandDispatcher.TestCommandScope.SINGLE_SUBTASK, false), Arrays.asList(TestJobBuilders.COMPLEX_GRAPH_BUILDER, TestCommandDispatcher.TestCommandScope.SINGLE_SUBTASK, false), Arrays.asList(TestJobBuilders.COMPLEX_GRAPH_BUILDER, TestCommandDispatcher.TestCommandScope.ALL_SUBTASKS, false));
        ArrayList arrayList = new ArrayList();
        for (String str : asList) {
            Iterator it = asList2.iterator();
            while (it.hasNext()) {
                ArrayList arrayList2 = new ArrayList((List) it.next());
                arrayList2.add(str);
                arrayList.add(arrayList2.toArray());
            }
        }
        return arrayList;
    }
}
