package org.apache.flink.runtime.operators.lifecycle;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommand;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher;
import org.apache.flink.runtime.operators.lifecycle.event.CheckpointCompletedEvent;
import org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders;
import org.apache.flink.runtime.operators.lifecycle.validation.DrainingValidator;
import org.apache.flink.runtime.operators.lifecycle.validation.FinishingValidator;
import org.apache.flink.runtime.operators.lifecycle.validation.TestJobDataFlowValidator;
import org.apache.flink.runtime.operators.lifecycle.validation.TestOperatorLifecycleValidator;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.class */
public class BoundedSourceITCase extends TestLogger {

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());

    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();

    @Rule
    public Timeout timeoutRule = new Timeout(10, TimeUnit.MINUTES);

    @Parameterized.Parameter
    public TestJobBuilders.TestingGraphBuilder graphBuilder;

    private static Configuration configuration() {
        Configuration configuration = new Configuration();
        try {
            FsStateChangelogStorageFactory.configure(configuration, TEMPORARY_FOLDER.newFolder(), Duration.ofMinutes(1L), 10);
            return configuration;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Parameterized.Parameters(name = "{0}")
    public static Object[] parameters() {
        return new Object[]{TestJobBuilders.SIMPLE_GRAPH_BUILDER, TestJobBuilders.COMPLEX_GRAPH_BUILDER};
    }

    @Test
    public void test() throws Exception {
        TestJobWithDescription build = this.graphBuilder.build(this.sharedObjects, configuration -> {
        }, streamExecutionEnvironment -> {
            streamExecutionEnvironment.getCheckpointConfig().setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
        });
        TestJobExecutor.execute(build, this.miniClusterResource).waitForEvent(CheckpointCompletedEvent.class).sendBroadcastCommand(TestCommand.FINISH_SOURCES, TestCommandDispatcher.TestCommandScope.ALL_SUBTASKS).waitForTermination().assertFinishedSuccessfully();
        TestOperatorLifecycleValidator.checkOperatorsLifecycle(build, new DrainingValidator(), new FinishingValidator());
        TestJobDataFlowValidator.checkDataFlow(build, true);
    }
}
