package org.apache.paimon.flink;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.types.Row;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/RescaleBucketITCase.class */
public class RescaleBucketITCase extends CatalogITCaseBase {
    private final String alterTableSql = "ALTER TABLE %s SET ('bucket' = '%d')";
    private final String rescaleOverwriteSql = "INSERT OVERWRITE %s SELECT * FROM %s";

    @Override // org.apache.paimon.flink.CatalogITCaseBase
    protected List<String> ddl() {
        return Arrays.asList(String.format("CREATE CATALOG `fs_catalog` WITH ('type' = 'paimon', 'warehouse' = '%s')", this.path), "CREATE TABLE IF NOT EXISTS `fs_catalog`.`default`.`T1` (f0 INT) WITH ('bucket' = '2')");
    }

    @Test
    public void testRescaleCatalogTable() {
        innerTest("fs_catalog", "T1");
    }

    @Test
    public void testSuspendAndRecoverAfterRescaleOverwrite() throws Exception {
        executeBoth(Arrays.asList("USE CATALOG fs_catalog", "CREATE TEMPORARY TABLE IF NOT EXISTS `S0` (f0 INT) WITH ('connector' = 'datagen')", "CREATE TABLE IF NOT EXISTS `T3` (f0 INT) WITH ('bucket' = '2')", "CREATE TABLE IF NOT EXISTS `T4` (f0 INT)"));
        SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), getTableDirectory("T3"));
        assertLatestSchema(schemaManager, 0L, 2);
        String stopJobSafely = stopJobSafely(startJobAndCommitSnapshot("EXECUTE STATEMENT SET BEGIN\n INSERT INTO `T3` SELECT * FROM `S0`;\n INSERT INTO `T4` SELECT * FROM `S0`;\nEND", null));
        Snapshot findLatestSnapshot = findLatestSnapshot("T3");
        Assertions.assertThat(findLatestSnapshot).isNotNull();
        assertSnapshotSchema(schemaManager, findLatestSnapshot.schemaId(), 0L, 2);
        List<Row> batchSql = batchSql("SELECT * FROM T3", new Object[0]);
        batchSql("ALTER TABLE %s SET ('bucket' = '%d')", "T3", 4);
        assertLatestSchema(schemaManager, 1L, 4);
        batchSql("INSERT OVERWRITE %s SELECT * FROM %s", "T3", "T3");
        Snapshot findLatestSnapshot2 = findLatestSnapshot("T3");
        Assertions.assertThat(findLatestSnapshot2).isNotNull();
        Assertions.assertThat(findLatestSnapshot2.id()).isEqualTo(findLatestSnapshot.id() + 1);
        Assertions.assertThat(findLatestSnapshot2.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
        assertSnapshotSchema(schemaManager, findLatestSnapshot2.schemaId(), 1L, 4);
        Assertions.assertThat(batchSql("SELECT * FROM T3", new Object[0])).containsExactlyInAnyOrderElementsOf(batchSql);
        this.sEnv.getConfig().getConfiguration().set(SavepointConfigOptions.SAVEPOINT_PATH, stopJobSafely);
        stopJobSafely(startJobAndCommitSnapshot("EXECUTE STATEMENT SET BEGIN\n INSERT INTO `T3` SELECT * FROM `S0`;\n INSERT INTO `T4` SELECT * FROM `S0`;\nEND", Long.valueOf(findLatestSnapshot2.id())));
        Snapshot findLatestSnapshot3 = findLatestSnapshot("T3");
        Assertions.assertThat(findLatestSnapshot3).isNotNull();
        SnapshotManager snapshotManager = new SnapshotManager(LocalFileIO.create(), getTableDirectory("T3"));
        long id = findLatestSnapshot3.id();
        while (true) {
            long j = id;
            if (j <= findLatestSnapshot2.id()) {
                Assertions.assertThat(batchSql("SELECT * FROM T3", new Object[0])).containsExactlyInAnyOrderElementsOf(batchSql("SELECT * FROM T4", new Object[0]));
                return;
            } else {
                assertSnapshotSchema(schemaManager, snapshotManager.snapshot(j).schemaId(), 1L, 4);
                id = j - 1;
            }
        }
    }

    private void waitForTheNextSnapshot(@Nullable Long l) throws InterruptedException {
        Snapshot findLatestSnapshot = findLatestSnapshot("T3");
        while (true) {
            Snapshot snapshot = findLatestSnapshot;
            if (snapshot != null && !new Long(snapshot.id()).equals(l)) {
                return;
            }
            Thread.sleep(2000L);
            findLatestSnapshot = findLatestSnapshot("T3");
        }
    }

    private JobClient startJobAndCommitSnapshot(String str, @Nullable Long l) throws Exception {
        JobClient jobClient = (JobClient) this.sEnv.executeSql(str).getJobClient().get();
        waitForTheNextSnapshot(l);
        return jobClient;
    }

    private String stopJobSafely(JobClient jobClient) throws ExecutionException, InterruptedException {
        CompletableFuture stopWithSavepoint = jobClient.stopWithSavepoint(true, this.path, SavepointFormatType.DEFAULT);
        while (!((JobStatus) jobClient.getJobStatus().get()).isGloballyTerminalState()) {
            Thread.sleep(2000L);
        }
        return (String) stopWithSavepoint.get();
    }

    private void assertLatestSchema(SchemaManager schemaManager, long j, int i) {
        Assertions.assertThat(schemaManager.latest()).isPresent();
        TableSchema tableSchema = (TableSchema) schemaManager.latest().get();
        Assertions.assertThat(tableSchema.id()).isEqualTo(j);
        Assertions.assertThat(tableSchema.options()).containsEntry(CoreOptions.BUCKET.key(), String.valueOf(i));
    }

    private void assertSnapshotSchema(SchemaManager schemaManager, long j, long j2, int i) {
        Assertions.assertThat(j).isEqualTo(j2);
        Assertions.assertThat(schemaManager.schema(j).options()).containsEntry(CoreOptions.BUCKET.key(), String.valueOf(i));
    }

    private void innerTest(String str, String str2) {
        batchSql("USE CATALOG %s", str);
        batchSql("INSERT INTO %s VALUES (1), (2), (3), (4), (5)", str2);
        Snapshot findLatestSnapshot = findLatestSnapshot(str2);
        Assertions.assertThat(findLatestSnapshot).isNotNull();
        SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), getTableDirectory(str2));
        assertSnapshotSchema(schemaManager, findLatestSnapshot.schemaId(), 0L, 2);
        batchSql("ALTER TABLE %s SET ('bucket' = '%d')", str2, 4);
        assertLatestSchema(schemaManager, 1L, 4);
        List asList = Arrays.asList(Row.of(new Object[]{1}), Row.of(new Object[]{2}), Row.of(new Object[]{3}), Row.of(new Object[]{4}), Row.of(new Object[]{5}));
        Assertions.assertThat(batchSql("SELECT * FROM %s", str2)).containsExactlyInAnyOrderElementsOf(asList);
        Assertions.assertThatThrownBy(() -> {
            batchSql("INSERT INTO %s VALUES (6)", str2);
        }).getRootCause().isInstanceOf(RuntimeException.class).hasMessage("Try to write table with a new bucket num 4, but the previous bucket num is 2. Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.");
        batchSql("INSERT OVERWRITE %s SELECT * FROM %s", str2, str2);
        Snapshot findLatestSnapshot2 = findLatestSnapshot(str2);
        Assertions.assertThat(findLatestSnapshot2).isNotNull();
        Assertions.assertThat(findLatestSnapshot2.id()).isEqualTo(2L);
        Assertions.assertThat(findLatestSnapshot2.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
        assertSnapshotSchema(schemaManager, findLatestSnapshot2.schemaId(), 1L, 4);
        Assertions.assertThat(batchSql("SELECT * FROM %s", str2)).containsExactlyInAnyOrderElementsOf(asList);
        batchSql("INSERT INTO %s VALUES(6)", str2);
        Assertions.assertThat(batchSql("SELECT * FROM %s", str2)).containsExactlyInAnyOrderElementsOf(Arrays.asList(Row.of(new Object[]{1}), Row.of(new Object[]{2}), Row.of(new Object[]{3}), Row.of(new Object[]{4}), Row.of(new Object[]{5}), Row.of(new Object[]{6})));
    }

    private void executeBoth(List<String> list) {
        list.forEach(str -> {
            this.sEnv.executeSql(str);
            this.tEnv.executeSql(str);
        });
    }
}
