package org.apache.paimon.flink.action;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.class */
public class SortCompactActionForUnawareBucketITCase extends ActionITCaseBase {
    private static final Random RANDOM = new Random();

    private void prepareData(int i, int i2) throws Exception {
        createTable();
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < i2; i3++) {
            arrayList.addAll(writeData(i));
        }
        commit(arrayList);
    }

    private void prepareSameData(int i) throws Exception {
        createTable();
        BatchTableWrite newWrite = getTable().newBatchWriteBuilder().newWrite();
        Throwable th = null;
        for (int i2 = 0; i2 < i; i2++) {
            try {
                try {
                    newWrite.write(data(0, 0, 0));
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (newWrite != null) {
                    if (th != null) {
                        try {
                            newWrite.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        newWrite.close();
                    }
                }
                throw th3;
            }
        }
        commit(newWrite.prepareCommit());
        if (newWrite != null) {
            if (0 == 0) {
                newWrite.close();
                return;
            }
            try {
                newWrite.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    @Test
    public void testOrderBy() throws Exception {
        prepareData(300, 1);
        Assertions.assertThatCode(() -> {
            order(Arrays.asList("f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10", "f11", "f12", "f13", "f14", "f15"));
        }).doesNotThrowAnyException();
    }

    @Test
    public void testOrderResult() throws Exception {
        prepareData(300, 2);
        Assertions.assertThatCode(() -> {
            order(Arrays.asList("f1", "f2"));
        }).doesNotThrowAnyException();
        ManifestEntry manifestEntry = (ManifestEntry) getTable().store().newScan().plan().files().get(0);
        DataSplit build = DataSplit.builder().withPartition(manifestEntry.partition()).withBucket(manifestEntry.bucket()).withDataFiles(Collections.singletonList(manifestEntry.file())).withBucketPath("not used").build();
        AtomicInteger atomicInteger = new AtomicInteger(Integer.MIN_VALUE);
        getTable().newReadBuilder().newRead().createReader(build).forEachRemaining(internalRow -> {
            Integer valueOf = Integer.valueOf(internalRow.getInt(1));
            Assertions.assertThat(valueOf).isGreaterThanOrEqualTo(atomicInteger.get());
            atomicInteger.set(valueOf.intValue());
        });
        Assertions.assertThatCode(() -> {
            order(Arrays.asList("f2", "f1"));
        }).doesNotThrowAnyException();
        ManifestEntry manifestEntry2 = (ManifestEntry) getTable().store().newScan().plan().files().get(0);
        DataSplit build2 = DataSplit.builder().withPartition(manifestEntry2.partition()).withBucket(manifestEntry2.bucket()).withDataFiles(Collections.singletonList(manifestEntry2.file())).withBucketPath("not used").build();
        atomicInteger.set(Integer.MIN_VALUE);
        getTable().newReadBuilder().newRead().createReader(build2).forEachRemaining(internalRow2 -> {
            Integer valueOf = Integer.valueOf(internalRow2.getInt(2));
            Assertions.assertThat(valueOf).isGreaterThanOrEqualTo(atomicInteger.get());
            atomicInteger.set(valueOf.intValue());
        });
    }

    @Test
    public void testAllBasicTypeWorksWithZorder() throws Exception {
        prepareData(300, 1);
        Assertions.assertThatCode(() -> {
            zorder(Arrays.asList("f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10", "f11", "f12", "f13", "f14", "f15"));
        }).doesNotThrowAnyException();
    }

    @Test
    public void testAllBasicTypeWorksWithHilbert() throws Exception {
        prepareData(300, 1);
        Assertions.assertThatCode(() -> {
            hilbert(Arrays.asList("f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10", "f11", "f12", "f13", "f14", "f15"));
        }).doesNotThrowAnyException();
    }

    @Test
    public void testZorderActionWorks() throws Exception {
        prepareData(300, 2);
        Predicate between = new PredicateBuilder(getTable().rowType()).between(1, 100, 200);
        Assertions.assertThat(getTable().store().newScan().plan().files().size()).isEqualTo(getTable().store().newScan().withFilter(between).plan().files().size());
        zorder(Arrays.asList("f2", "f1"));
        Assertions.assertThat(getTable().store().newScan().plan().files().size()).isGreaterThan(getTable().store().newScan().withFilter(between).plan().files().size());
    }

    @Test
    public void testHilbertActionWorks() throws Exception {
        prepareData(300, 2);
        Predicate between = new PredicateBuilder(getTable().rowType()).between(1, 100, 200);
        Assertions.assertThat(getTable().store().newScan().plan().files().size()).isEqualTo(getTable().store().newScan().withFilter(between).plan().files().size());
        hilbert(Arrays.asList("f2", "f1"));
        Assertions.assertThat(getTable().store().newScan().plan().files().size()).isGreaterThan(getTable().store().newScan().withFilter(between).plan().files().size());
    }

    @Test
    public void testCompareZorderAndOrder() throws Exception {
        prepareData(300, 10);
        zorder(Arrays.asList("f2", "f1"));
        Predicate between = new PredicateBuilder(getTable().rowType()).between(1, 10, 20);
        List files = getTable().store().newScan().plan().files();
        List files2 = getTable().store().newScan().withFilter(between).plan().files();
        order(Arrays.asList("f2", "f1"));
        Assertions.assertThat(files2.size() / files.size()).isLessThan(getTable().store().newScan().withFilter(between).plan().files().size() / getTable().store().newScan().plan().files().size());
    }

    @Test
    public void testCompareHilbertAndOrder() throws Exception {
        prepareData(300, 10);
        hilbert(Arrays.asList("f2", "f1"));
        Predicate between = new PredicateBuilder(getTable().rowType()).between(1, 10, 20);
        List files = getTable().store().newScan().plan().files();
        List files2 = getTable().store().newScan().withFilter(between).plan().files();
        order(Arrays.asList("f2", "f1"));
        Assertions.assertThat(files2.size() / files.size()).isLessThan(getTable().store().newScan().withFilter(between).plan().files().size() / getTable().store().newScan().plan().files().size());
    }

    @Test
    public void testTableConf() throws Exception {
        createTable();
        Assertions.assertThat((String) new SortCompactAction(this.warehouse, this.database, this.tableName, Collections.emptyMap(), Collections.singletonMap(FlinkConnectorOptions.SINK_PARALLELISM.key(), "20")).withOrderStrategy("zorder").withOrderColumns(Collections.singletonList("f0")).table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key())).isEqualTo("20");
    }

    @Test
    public void testRandomSuffixWorks() throws Exception {
        prepareSameData(200);
        Assertions.assertThatCode(() -> {
            order(Collections.singletonList("f1"));
        }).doesNotThrowAnyException();
        Assertions.assertThat(getTable().store().newScan().plan().files().size()).isEqualTo(3);
        dropTable();
        prepareSameData(200);
        Assertions.assertThatCode(() -> {
            zorder(Arrays.asList("f1", "f2"));
        }).doesNotThrowAnyException();
        Assertions.assertThat(getTable().store().newScan().plan().files().size()).isEqualTo(3);
    }

    @Test
    public void testSortCompactionOnEmptyData() throws Exception {
        createTable();
        new SortCompactAction(this.warehouse, this.database, this.tableName, Collections.emptyMap(), Collections.emptyMap()).withOrderStrategy("zorder").withOrderColumns(Collections.singletonList("f0")).run();
    }

    private void zorder(List<String> list) throws Exception {
        createAction("zorder", RANDOM.nextBoolean() ? "size" : "quantity", list).run();
    }

    private void hilbert(List<String> list) throws Exception {
        createAction("hilbert", RANDOM.nextBoolean() ? "size" : "quantity", list).run();
    }

    private void order(List<String> list) throws Exception {
        createAction("order", RANDOM.nextBoolean() ? "size" : "quantity", list).run();
    }

    private SortCompactAction createAction(String str, String str2, List<String> list) {
        return createAction(str, str2, list, Lists.newArrayList());
    }

    private SortCompactAction createAction(String str, String str2, List<String> list, List<String> list2) {
        ArrayList newArrayList = Lists.newArrayList(new String[]{"compact", "--warehouse", this.warehouse, "--database", this.database, "--table", this.tableName, "--order_strategy", str, "--order_by", String.join(",", list), "--table_conf", "sort-compaction.range-strategy=" + str2});
        newArrayList.addAll(list2);
        return createAction(SortCompactAction.class, (String[]) newArrayList.toArray(new String[0]));
    }

    @Test
    public void testvalidSampleConfig() throws Exception {
        prepareData(300, 1);
        ArrayList newArrayList = Lists.newArrayList(new String[]{"--table_conf", "sort-compaction.local-sample.magnification=1"});
        Assertions.assertThatCode(() -> {
            createAction("order", "size", Arrays.asList("f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10", "f11", "f12", "f13", "f14", "f15"), newArrayList).run();
        }).hasMessage("the config 'sort-compaction.local-sample.magnification=1' should not be set too small,greater than or equal to 20 is needed.");
    }

    private void createTable() throws Exception {
        this.catalog.createDatabase(this.database, true);
        this.catalog.createTable(identifier(), schema(), true);
    }

    private void dropTable() throws Exception {
        this.catalog.dropTable(identifier(), true);
    }

    private Identifier identifier() {
        return Identifier.create(this.database, this.tableName);
    }

    private void commit(List<CommitMessage> list) throws Exception {
        BatchTableCommit newCommit = getTable().newBatchWriteBuilder().newCommit();
        newCommit.commit(list);
        newCommit.close();
    }

    private static Schema schema() {
        Schema.Builder newBuilder = Schema.newBuilder();
        newBuilder.column("f0", DataTypes.TINYINT());
        newBuilder.column("f1", DataTypes.INT());
        newBuilder.column("f2", DataTypes.SMALLINT());
        newBuilder.column("f3", DataTypes.STRING());
        newBuilder.column("f4", DataTypes.DOUBLE());
        newBuilder.column("f5", DataTypes.CHAR(10));
        newBuilder.column("f6", DataTypes.VARCHAR(10));
        newBuilder.column("f7", DataTypes.BOOLEAN());
        newBuilder.column("f8", DataTypes.DATE());
        newBuilder.column("f9", DataTypes.TIME());
        newBuilder.column("f10", DataTypes.TIMESTAMP());
        newBuilder.column("f11", DataTypes.DECIMAL(10, 2));
        newBuilder.column("f12", DataTypes.BYTES());
        newBuilder.column("f13", DataTypes.FLOAT());
        newBuilder.column("f14", DataTypes.BINARY(10));
        newBuilder.column("f15", DataTypes.VARBINARY(10));
        newBuilder.option("bucket", "-1");
        newBuilder.option("scan.parallelism", "6");
        newBuilder.option("sink.parallelism", "3");
        newBuilder.option("target-file-size", "1 M");
        newBuilder.partitionKeys(new String[]{"f0"});
        return newBuilder.build();
    }

    private List<CommitMessage> writeData(int i) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 2; i2++) {
            arrayList.addAll(writeOnce(getTable(), i2, i));
        }
        return arrayList;
    }

    private FileStoreTable getTable() throws Exception {
        return this.catalog.getTable(identifier());
    }

    private static List<CommitMessage> writeOnce(Table table, int i, int i2) throws Exception {
        BatchTableWrite newWrite = table.newBatchWriteBuilder().newWrite();
        Throwable th = null;
        for (int i3 = 0; i3 < i2; i3++) {
            for (int i4 = 0; i4 < i2; i4++) {
                try {
                    try {
                        newWrite.write(data(i, i3, i4));
                    } finally {
                    }
                } catch (Throwable th2) {
                    if (newWrite != null) {
                        if (th != null) {
                            try {
                                newWrite.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            newWrite.close();
                        }
                    }
                    throw th2;
                }
            }
        }
        List<CommitMessage> prepareCommit = newWrite.prepareCommit();
        if (newWrite != null) {
            if (0 != 0) {
                try {
                    newWrite.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                newWrite.close();
            }
        }
        return prepareCommit;
    }

    private static InternalRow data(int i, int i2, int i3) {
        Object[] objArr = new Object[16];
        objArr[0] = Byte.valueOf((byte) i);
        objArr[1] = Integer.valueOf(i3);
        objArr[2] = Short.valueOf((short) i2);
        objArr[3] = BinaryString.fromString(String.valueOf(i3));
        objArr[4] = Double.valueOf(0.1d + i2);
        objArr[5] = BinaryString.fromString(String.valueOf(i3));
        objArr[6] = BinaryString.fromString(String.valueOf(i2));
        objArr[7] = Boolean.valueOf(i3 % 2 == 1);
        objArr[8] = Integer.valueOf(i2);
        objArr[9] = Integer.valueOf(i3);
        objArr[10] = Timestamp.fromEpochMillis(i2);
        objArr[11] = Decimal.zero(10, 2);
        objArr[12] = String.valueOf(i2).getBytes();
        objArr[13] = Float.valueOf(0.1f + i3);
        objArr[14] = randomBytes();
        objArr[15] = randomBytes();
        return GenericRow.of(objArr);
    }

    private static byte[] randomBytes() {
        byte[] bArr = new byte[RANDOM.nextInt(10)];
        RANDOM.nextBytes(bArr);
        return bArr;
    }
}
