package org.apache.flink.table.planner.plan.nodes.exec.testutils;

import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.PlanReference;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
import org.apache.flink.table.test.program.SinkTestStep;
import org.apache.flink.table.test.program.SourceTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
import org.apache.flink.table.test.program.TableTestProgramRunner;
import org.apache.flink.table.test.program.TestStep;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@ExtendWith({MiniClusterExtension.class})
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.class */
public abstract class RestoreTestBase implements TableTestProgramRunner {
    private static final FlinkVersion FLINK_VERSION = null;
    private final Class<? extends ExecNode<?>> execNodeUnderTest;
    private final List<Class<? extends ExecNode<?>>> childExecNodesUnderTest;
    private final AfterRestoreSource afterRestoreSource;

    @TempDir
    private Path tmpDir;

    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase$AfterRestoreSource.class */
    protected enum AfterRestoreSource {
        FINITE,
        INFINITE,
        NO_RESTORE
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RestoreTestBase(Class<? extends ExecNode<?>> cls) {
        this(cls, new ArrayList(), AfterRestoreSource.FINITE);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RestoreTestBase(Class<? extends ExecNode<?>> cls, List<Class<? extends ExecNode<?>>> list) {
        this(cls, list, AfterRestoreSource.FINITE);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RestoreTestBase(Class<? extends ExecNode<?>> cls, AfterRestoreSource afterRestoreSource) {
        this(cls, new ArrayList(), afterRestoreSource);
    }

    protected RestoreTestBase(Class<? extends ExecNode<?>> cls, List<Class<? extends ExecNode<?>>> list, AfterRestoreSource afterRestoreSource) {
        this.execNodeUnderTest = cls;
        this.childExecNodesUnderTest = list;
        this.afterRestoreSource = afterRestoreSource;
    }

    public Class<? extends ExecNode<?>> getExecNode() {
        return this.execNodeUnderTest;
    }

    public List<Class<? extends ExecNode<?>>> getChildExecNodes() {
        return this.childExecNodesUnderTest;
    }

    public EnumSet<TestStep.TestKind> supportedSetupSteps() {
        return EnumSet.of(TestStep.TestKind.CONFIG, TestStep.TestKind.FUNCTION, TestStep.TestKind.TEMPORAL_FUNCTION, TestStep.TestKind.SOURCE_WITH_RESTORE_DATA, TestStep.TestKind.SOURCE_WITH_DATA, TestStep.TestKind.SINK_WITH_RESTORE_DATA, TestStep.TestKind.SINK_WITH_DATA);
    }

    public EnumSet<TestStep.TestKind> supportedRunSteps() {
        return EnumSet.of(TestStep.TestKind.SQL, TestStep.TestKind.STATEMENT_SET);
    }

    @AfterEach
    public void clearData() {
        TestValuesTableFactory.clearAllData();
    }

    private List<ExecNodeMetadata> getAllMetadata() {
        return ExecNodeMetadataUtil.extractMetadataFromAnnotation(this.execNodeUnderTest);
    }

    private ExecNodeMetadata getLatestMetadata() {
        return ExecNodeMetadataUtil.latestAnnotation(this.execNodeUnderTest);
    }

    private Stream<Arguments> createSpecs() {
        return getAllMetadata().stream().flatMap(execNodeMetadata -> {
            return supportedPrograms().stream().flatMap(tableTestProgram -> {
                return getSavepointPaths(tableTestProgram, execNodeMetadata).map(str -> {
                    return Arguments.of(new Object[]{tableTestProgram, getPlanPath(tableTestProgram, execNodeMetadata), str});
                });
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Stream<String> getSavepointPaths(TableTestProgram tableTestProgram, ExecNodeMetadata execNodeMetadata) {
        return Stream.of(getSavepointPath(tableTestProgram, execNodeMetadata, null));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final String getSavepointPath(TableTestProgram tableTestProgram, ExecNodeMetadata execNodeMetadata, @Nullable FlinkVersion flinkVersion) {
        StringBuilder sb = new StringBuilder();
        sb.append(getTestResourceDirectory(tableTestProgram, execNodeMetadata));
        if (flinkVersion != null) {
            sb.append("/").append(flinkVersion);
        }
        sb.append("/savepoint/");
        return sb.toString();
    }

    private void registerSinkObserver(List<CompletableFuture<?>> list, SinkTestStep sinkTestStep, boolean z) {
        CompletableFuture<?> completableFuture = new CompletableFuture<>();
        list.add(completableFuture);
        String str = sinkTestStep.name;
        TestValuesTableFactory.registerLocalRawResultsObserver(str, (num, list2) -> {
            ArrayList arrayList = new ArrayList(sinkTestStep.getExpectedBeforeRestoreAsStrings());
            if (!z) {
                arrayList.addAll(sinkTestStep.getExpectedAfterRestoreAsStrings());
            }
            if (CollectionUtils.isEqualCollection(getExpectedResults(sinkTestStep, str), arrayList)) {
                completableFuture.complete(null);
            }
        });
    }

    @MethodSource({"supportedPrograms"})
    @Order(0)
    @Disabled
    @ParameterizedTest
    public void generateTestSetupFiles(TableTestProgram tableTestProgram) throws Exception {
        EnvironmentSettings inStreamingMode = EnvironmentSettings.inStreamingMode();
        inStreamingMode.getConfiguration().set(StateBackendOptions.STATE_BACKEND, "rocksdb");
        TableEnvironment create = TableEnvironment.create(inStreamingMode);
        tableTestProgram.getSetupConfigOptionTestSteps().forEach(configOptionTestStep -> {
            configOptionTestStep.apply(create);
        });
        create.getConfig().set(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS, TableConfigOptions.CatalogPlanCompilation.SCHEMA);
        for (SourceTestStep sourceTestStep : tableTestProgram.getSetupSourceTestSteps()) {
            String registerData = TestValuesTableFactory.registerData(sourceTestStep.dataBeforeRestore);
            HashMap hashMap = new HashMap();
            hashMap.put("connector", TestValuesTableFactory.IDENTIFIER);
            hashMap.put("data-id", registerData);
            hashMap.put("terminating", "false");
            hashMap.put("runtime-source", "NewSource");
            sourceTestStep.apply(create, hashMap);
        }
        ArrayList arrayList = new ArrayList();
        for (SinkTestStep sinkTestStep : tableTestProgram.getSetupSinkTestSteps()) {
            registerSinkObserver(arrayList, sinkTestStep, true);
            HashMap hashMap2 = new HashMap();
            hashMap2.put("connector", TestValuesTableFactory.IDENTIFIER);
            hashMap2.put("sink-insert-only", "false");
            sinkTestStep.apply(create, hashMap2);
        }
        tableTestProgram.getSetupFunctionTestSteps().forEach(functionTestStep -> {
            functionTestStep.apply(create);
        });
        tableTestProgram.getSetupTemporalFunctionTestSteps().forEach(temporalFunctionTestStep -> {
            temporalFunctionTestStep.apply(create);
        });
        CompiledPlan compiledPlan = ((TestStep) tableTestProgram.runSteps.get(0)).getKind() == TestStep.TestKind.STATEMENT_SET ? tableTestProgram.getRunStatementSetTestStep().compiledPlan(create) : create.compilePlanSql(tableTestProgram.getRunSqlTestStep().sql);
        compiledPlan.writeToFile(getPlanPath(tableTestProgram, getLatestMetadata()));
        TableResult execute = compiledPlan.execute();
        CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).get();
        JobClient jobClient = (JobClient) execute.getJobClient().get();
        String str = (String) jobClient.stopWithSavepoint(false, this.tmpDir.toString(), SavepointFormatType.DEFAULT).get();
        CommonTestUtils.waitForJobStatus(jobClient, Collections.singletonList(JobStatus.FINISHED));
        Path path = Paths.get(new URI(str));
        Path path2 = Paths.get(getSavepointPath(tableTestProgram, getLatestMetadata(), FLINK_VERSION), new String[0]);
        Files.createDirectories(path2, new FileAttribute[0]);
        Files.move(path, path2, StandardCopyOption.ATOMIC_MOVE);
    }

    @MethodSource({"createSpecs"})
    @Order(1)
    @ParameterizedTest
    void testRestore(TableTestProgram tableTestProgram, Path path, String str) throws Exception {
        EnvironmentSettings inStreamingMode = EnvironmentSettings.inStreamingMode();
        SavepointRestoreSettings.toConfiguration(this.afterRestoreSource == AfterRestoreSource.NO_RESTORE ? SavepointRestoreSettings.none() : SavepointRestoreSettings.forPath(str, false, RestoreMode.NO_CLAIM), inStreamingMode.getConfiguration());
        inStreamingMode.getConfiguration().set(StateBackendOptions.STATE_BACKEND, "rocksdb");
        TableEnvironment create = TableEnvironment.create(inStreamingMode);
        create.getConfig().set(TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS, TableConfigOptions.CatalogPlanRestore.IDENTIFIER);
        tableTestProgram.getSetupConfigOptionTestSteps().forEach(configOptionTestStep -> {
            configOptionTestStep.apply(create);
        });
        for (SourceTestStep sourceTestStep : tableTestProgram.getSetupSourceTestSteps()) {
            String registerData = TestValuesTableFactory.registerData(this.afterRestoreSource == AfterRestoreSource.NO_RESTORE ? sourceTestStep.dataBeforeRestore : sourceTestStep.dataAfterRestore);
            HashMap hashMap = new HashMap();
            hashMap.put("connector", TestValuesTableFactory.IDENTIFIER);
            hashMap.put("data-id", registerData);
            hashMap.put("runtime-source", "NewSource");
            if (this.afterRestoreSource == AfterRestoreSource.INFINITE) {
                hashMap.put("terminating", "false");
            }
            sourceTestStep.apply(create, hashMap);
        }
        ArrayList arrayList = new ArrayList();
        for (SinkTestStep sinkTestStep : tableTestProgram.getSetupSinkTestSteps()) {
            if (this.afterRestoreSource == AfterRestoreSource.INFINITE) {
                registerSinkObserver(arrayList, sinkTestStep, false);
            }
            HashMap hashMap2 = new HashMap();
            hashMap2.put("connector", TestValuesTableFactory.IDENTIFIER);
            hashMap2.put("disable-lookup", "true");
            hashMap2.put("sink-insert-only", "false");
            sinkTestStep.apply(create, hashMap2);
        }
        tableTestProgram.getSetupFunctionTestSteps().forEach(functionTestStep -> {
            functionTestStep.apply(create);
        });
        tableTestProgram.getSetupTemporalFunctionTestSteps().forEach(temporalFunctionTestStep -> {
            temporalFunctionTestStep.apply(create);
        });
        CompiledPlan loadPlan = create.loadPlan(PlanReference.fromFile(path));
        if (this.afterRestoreSource == AfterRestoreSource.INFINITE) {
            TableResult execute = loadPlan.execute();
            CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).get();
            ((JobClient) execute.getJobClient().get()).cancel().get();
        } else {
            loadPlan.execute().await();
            for (SinkTestStep sinkTestStep2 : tableTestProgram.getSetupSinkTestSteps()) {
                Assertions.assertThat(getExpectedResults(sinkTestStep2, sinkTestStep2.name)).containsExactlyInAnyOrder(Stream.concat(sinkTestStep2.getExpectedBeforeRestoreAsStrings().stream(), sinkTestStep2.getExpectedAfterRestoreAsStrings().stream()).toArray(i -> {
                    return new String[i];
                }));
            }
        }
    }

    private Path getPlanPath(TableTestProgram tableTestProgram, ExecNodeMetadata execNodeMetadata) {
        return Paths.get(getTestResourceDirectory(tableTestProgram, execNodeMetadata) + "/plan/" + tableTestProgram.id + ".json", new String[0]);
    }

    private String getTestResourceDirectory(TableTestProgram tableTestProgram, ExecNodeMetadata execNodeMetadata) {
        return String.format("%s/src/test/resources/restore-tests/%s_%d/%s", System.getProperty("user.dir"), execNodeMetadata.name(), Integer.valueOf(execNodeMetadata.version()), tableTestProgram.id);
    }

    private static List<String> getExpectedResults(SinkTestStep sinkTestStep, String str) {
        return sinkTestStep.getTestChangelogData() ? TestValuesTableFactory.getRawResultsAsStrings(str) : TestValuesTableFactory.getResultsAsStrings(str);
    }
}
