package org.apache.flink.table.store.connector;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/store/connector/RescaleBucketITCase.class */
public class RescaleBucketITCase extends FileStoreTableITCase {
    private final String alterTableSql = "ALTER TABLE %s SET ('bucket' = '%d')";
    private final String rescaleOverwriteSql = "INSERT OVERWRITE %s SELECT * FROM %s";

    @Override // org.apache.flink.table.store.connector.FileStoreTableITCase
    protected List<String> ddl() {
        return Arrays.asList("CREATE TABLE IF NOT EXISTS `default_catalog`.`default_database`.`T0` (f0 INT) WITH ('bucket' = '2')", String.format("CREATE CATALOG `fs_catalog` WITH ('type' = 'table-store', 'warehouse' = '%s')", this.path), "CREATE TABLE IF NOT EXISTS `fs_catalog`.`default`.`T1` (f0 INT) WITH ('bucket' = '2')");
    }

    @Test
    public void testRescaleManagedTable() {
        innerTest("default_catalog", "T0", true);
    }

    @Test
    public void testRescaleCatalogTable() {
        innerTest("fs_catalog", "T1", false);
    }

    @Test
    public void testSuspendAndRecoverAfterRescaleOverwrite() throws Exception {
        this.sEnv.executeSql("CREATE TABLE IF NOT EXISTS `default_catalog`.`default_database`.`S0` (f0 INT) WITH ('connector' = 'datagen')");
        executeBoth(Arrays.asList("USE CATALOG fs_catalog", "CREATE TABLE IF NOT EXISTS `T3` (f0 INT) WITH ('bucket' = '2')", "CREATE TABLE IF NOT EXISTS `T4` (f0 INT)"));
        SchemaManager schemaManager = new SchemaManager(getTableDirectory("T3", false));
        assertLatestSchema(schemaManager, 0L, 2);
        this.sEnv.getConfig().getConfiguration().set(SavepointConfigOptions.SAVEPOINT_PATH, this.path);
        ClusterClient<?> clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
        stopJobSafely(clusterClient, startJobAndCommitSnapshot("EXECUTE STATEMENT SET BEGIN\n INSERT INTO `T3` SELECT * FROM `default_catalog`.`default_database`.`S0`;\n INSERT INTO `T4` SELECT * FROM `default_catalog`.`default_database`.`S0`;\nEND", null));
        Snapshot findLatestSnapshot = findLatestSnapshot("T3", false);
        Assertions.assertThat(findLatestSnapshot).isNotNull();
        assertSnapshotSchema(schemaManager, findLatestSnapshot.schemaId(), 0L, 2);
        List<Row> batchSql = batchSql("SELECT * FROM T3", new Object[0]);
        batchSql("ALTER TABLE %s SET ('bucket' = '%d')", "T3", 4);
        assertLatestSchema(schemaManager, 1L, 4);
        batchSql("INSERT OVERWRITE %s SELECT * FROM %s", "T3", "T3");
        Snapshot findLatestSnapshot2 = findLatestSnapshot("T3", false);
        Assertions.assertThat(findLatestSnapshot2).isNotNull();
        Assertions.assertThat(findLatestSnapshot2.id()).isEqualTo(findLatestSnapshot.id() + 1);
        Assertions.assertThat(findLatestSnapshot2.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
        assertSnapshotSchema(schemaManager, findLatestSnapshot2.schemaId(), 1L, 4);
        Assertions.assertThat(batchSql("SELECT * FROM T3", new Object[0])).containsExactlyInAnyOrderElementsOf(batchSql);
        stopJobSafely(clusterClient, startJobAndCommitSnapshot("EXECUTE STATEMENT SET BEGIN\n INSERT INTO `T3` SELECT * FROM `default_catalog`.`default_database`.`S0`;\n INSERT INTO `T4` SELECT * FROM `default_catalog`.`default_database`.`S0`;\nEND", Long.valueOf(findLatestSnapshot2.id())));
        Snapshot findLatestSnapshot3 = findLatestSnapshot("T3", false);
        Assertions.assertThat(findLatestSnapshot3).isNotNull();
        SnapshotManager snapshotManager = new SnapshotManager(getTableDirectory("T3", false));
        long id = findLatestSnapshot3.id();
        while (true) {
            long j = id;
            if (j <= findLatestSnapshot2.id()) {
                Assertions.assertThat(batchSql("SELECT * FROM T3", new Object[0])).containsExactlyInAnyOrderElementsOf(batchSql("SELECT * FROM T4", new Object[0]));
                return;
            } else {
                assertSnapshotSchema(schemaManager, snapshotManager.snapshot(j).schemaId(), 1L, 4);
                id = j - 1;
            }
        }
    }

    private void waitForTheNextSnapshot(@Nullable Long l) throws InterruptedException {
        Snapshot findLatestSnapshot = findLatestSnapshot("T3", false);
        while (true) {
            Snapshot snapshot = findLatestSnapshot;
            if (snapshot != null && !new Long(snapshot.id()).equals(l)) {
                return;
            }
            Thread.sleep(2000L);
            findLatestSnapshot = findLatestSnapshot("T3", false);
        }
    }

    private JobID startJobAndCommitSnapshot(String str, @Nullable Long l) throws Exception {
        JobID jobID = ((JobClient) this.sEnv.executeSql(str).getJobClient().get()).getJobID();
        waitForTheNextSnapshot(l);
        return jobID;
    }

    private void stopJobSafely(ClusterClient<?> clusterClient, JobID jobID) throws ExecutionException, InterruptedException {
        clusterClient.stopWithSavepoint(jobID, true, this.path, SavepointFormatType.DEFAULT);
        while (clusterClient.getJobStatus(jobID).get() == JobStatus.RUNNING) {
            Thread.sleep(2000L);
        }
    }

    private void assertLatestSchema(SchemaManager schemaManager, long j, int i) {
        Assertions.assertThat(schemaManager.latest()).isPresent();
        TableSchema tableSchema = (TableSchema) schemaManager.latest().get();
        Assertions.assertThat(tableSchema.id()).isEqualTo(j);
        Assertions.assertThat(tableSchema.options()).containsEntry(CoreOptions.BUCKET.key(), String.valueOf(i));
    }

    private void assertSnapshotSchema(SchemaManager schemaManager, long j, long j2, int i) {
        Assertions.assertThat(j).isEqualTo(j2);
        Assertions.assertThat(schemaManager.schema(j).options()).containsEntry(CoreOptions.BUCKET.key(), String.valueOf(i));
    }

    private void innerTest(String str, String str2, boolean z) {
        batchSql("USE CATALOG %s", str);
        batchSql("INSERT INTO %s VALUES (1), (2), (3), (4), (5)", str2);
        Snapshot findLatestSnapshot = findLatestSnapshot(str2, z);
        Assertions.assertThat(findLatestSnapshot).isNotNull();
        SchemaManager schemaManager = new SchemaManager(getTableDirectory(str2, z));
        assertSnapshotSchema(schemaManager, findLatestSnapshot.schemaId(), 0L, 2);
        batchSql("ALTER TABLE %s SET ('bucket' = '%d')", str2, 4);
        if (z) {
            assertLatestSchema(schemaManager, 0L, 2);
        } else {
            assertLatestSchema(schemaManager, 1L, 4);
        }
        List asList = Arrays.asList(Row.of(new Object[]{1}), Row.of(new Object[]{2}), Row.of(new Object[]{3}), Row.of(new Object[]{4}), Row.of(new Object[]{5}));
        Assertions.assertThat(batchSql("SELECT * FROM %s", str2)).containsExactlyInAnyOrderElementsOf(asList);
        Assertions.assertThatThrownBy(() -> {
            batchSql("INSERT INTO %s VALUES (6)", str2);
        }).getRootCause().isInstanceOf(TableException.class).hasMessage("Try to write table with a new bucket num 4, but the previous bucket num is 2. Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.");
        batchSql("INSERT OVERWRITE %s SELECT * FROM %s", str2, str2);
        Snapshot findLatestSnapshot2 = findLatestSnapshot(str2, z);
        Assertions.assertThat(findLatestSnapshot2).isNotNull();
        Assertions.assertThat(findLatestSnapshot2.id()).isEqualTo(2L);
        Assertions.assertThat(findLatestSnapshot2.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
        assertSnapshotSchema(schemaManager, findLatestSnapshot2.schemaId(), z ? 0L : 1L, z ? 2 : 4);
        Assertions.assertThat(batchSql("SELECT * FROM %s", str2)).containsExactlyInAnyOrderElementsOf(asList);
        batchSql("INSERT INTO %s VALUES(6)", str2);
        Assertions.assertThat(batchSql("SELECT * FROM %s", str2)).containsExactlyInAnyOrderElementsOf(Arrays.asList(Row.of(new Object[]{1}), Row.of(new Object[]{2}), Row.of(new Object[]{3}), Row.of(new Object[]{4}), Row.of(new Object[]{5}), Row.of(new Object[]{6})));
    }

    private void executeBoth(List<String> list) {
        list.forEach(str -> {
            this.sEnv.executeSql(str);
            this.bEnv.executeSql(str);
        });
    }
}
