package org.apache.flink.table.store.connector.sink;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/store/connector/sink/SinkSavepointITCase.class */
public class SinkSavepointITCase extends AbstractTestBase {
    private String path;
    private String failingName;

    @Before
    public void before() throws Exception {
        this.path = TEMPORARY_FOLDER.newFolder().toPath().toString();
        this.failingName = UUID.randomUUID().toString();
        FailingAtomicRenameFileSystem.reset(this.failingName, 100, 500);
    }

    @Test(timeout = 180000)
    public void testRecoverFromSavepoint() throws Exception {
        String failingPath = FailingAtomicRenameFileSystem.getFailingPath(this.failingName, this.path);
        String str = null;
        ClusterClient clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
        ThreadLocalRandom current = ThreadLocalRandom.current();
        while (true) {
            JobID runRecoverFromSavepointJob = runRecoverFromSavepointJob(failingPath, str);
            while (true) {
                Thread.sleep(current.nextInt(5000));
                if (clusterClient.getJobStatus(runRecoverFromSavepointJob).get() == JobStatus.FINISHED) {
                    checkRecoverFromSavepointResult(failingPath);
                    return;
                }
                try {
                    str = (String) clusterClient.stopWithSavepoint(runRecoverFromSavepointJob, false, this.path + "/savepoint", SavepointFormatType.DEFAULT).get();
                    break;
                } catch (Exception e) {
                    Optional findThrowable = ExceptionUtils.findThrowable(e, StopWithSavepointStoppingException.class);
                    if (findThrowable.isPresent()) {
                        str = ((StopWithSavepointStoppingException) findThrowable.get()).getSavepointPath();
                        break;
                    }
                }
            }
            while (!((JobStatus) clusterClient.getJobStatus(runRecoverFromSavepointJob).get()).isGloballyTerminalState()) {
                Thread.sleep(1000L);
            }
        }
    }

    private JobID runRecoverFromSavepointJob(String str, String str2) throws Exception {
        Configuration configuration = new Configuration();
        if (str2 != null) {
            SavepointRestoreSettings.toConfiguration(SavepointRestoreSettings.forPath(str2, false), configuration);
        }
        StreamTableEnvironment create = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment(configuration), EnvironmentSettings.newInstance().inStreamingMode().build());
        create.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMillis(500L));
        create.getConfig().getConfiguration().set(StateBackendOptions.STATE_BACKEND, "filesystem");
        create.getConfig().getConfiguration().set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" + this.path + "/checkpoint");
        create.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
        create.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS, true);
        create.executeSql(String.join("\n", "CREATE TABLE S (", "  a INT", ") WITH (", "  'connector' = 'datagen',", "  'rows-per-second' = '10000',", "  'fields.a.kind' = 'sequence',", "  'fields.a.start' = '0',", "  'fields.a.end' = '99999'", ")"));
        String join = String.join("\n", "CREATE CATALOG my_catalog WITH (", "  'type' = 'table-store',", "  'warehouse' = '" + str + "'", ")");
        FailingAtomicRenameFileSystem.retryArtificialException(() -> {
            return create.executeSql(join);
        });
        create.executeSql("USE CATALOG my_catalog");
        String join2 = String.join("\n", "CREATE TABLE IF NOT EXISTS T (", "  a INT", ") WITH (", "  'bucket' = '2',", "  'file.format' = 'avro'", ")");
        FailingAtomicRenameFileSystem.retryArtificialException(() -> {
            return create.executeSql(join2);
        });
        String str3 = "INSERT INTO T SELECT * FROM default_catalog.default_database.S";
        JobID jobID = ((JobClient) ((TableResult) FailingAtomicRenameFileSystem.retryArtificialException(() -> {
            return create.executeSql(str3);
        })).getJobClient().get()).getJobID();
        ClusterClient clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
        while (clusterClient.getJobStatus(jobID).get() == JobStatus.INITIALIZING) {
            Thread.sleep(1000L);
        }
        return jobID;
    }

    private void checkRecoverFromSavepointResult(String str) throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
        FailingAtomicRenameFileSystem.reset(this.failingName, 0, 1);
        create.executeSql(String.join("\n", "CREATE CATALOG my_catalog WITH (", "  'type' = 'table-store',", "  'warehouse' = '" + str + "'", ")"));
        create.executeSql("USE CATALOG my_catalog");
        ArrayList arrayList = new ArrayList();
        CloseableIterator collect = create.executeSql("SELECT * FROM T").collect();
        Throwable th = null;
        while (collect.hasNext()) {
            try {
                try {
                    Row row = (Row) collect.next();
                    Assert.assertEquals(1L, row.getArity());
                    arrayList.add((Integer) row.getField(0));
                } catch (Throwable th2) {
                    if (collect != null) {
                        if (th != null) {
                            try {
                                collect.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            collect.close();
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        }
        if (collect != null) {
            if (0 != 0) {
                try {
                    collect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                collect.close();
            }
        }
        Collections.sort(arrayList);
        Assert.assertEquals(IntStream.range(0, 100000).boxed().collect(Collectors.toList()), arrayList);
    }
}
