package org.apache.paimon.flink.sink.index;

import java.io.File;
import java.util.ArrayList;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.class */
public class GlobalIndexAssignerTest extends TableTestBase {
    private GlobalIndexAssigner<RowData> createAssigner(CoreOptions.MergeEngine mergeEngine) throws Exception {
        Identifier identifier = identifier("T");
        Options options = new Options();
        options.set(CoreOptions.MERGE_ENGINE, mergeEngine);
        if (mergeEngine == CoreOptions.MergeEngine.FIRST_ROW) {
            options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.LOOKUP);
        }
        options.set(CoreOptions.DYNAMIC_BUCKET_TARGET_ROW_NUM, 3L);
        options.set(CoreOptions.BUCKET, -1);
        this.catalog.createTable(identifier, Schema.newBuilder().column("pt", DataTypes.INT()).column("pk", DataTypes.INT()).column("col", DataTypes.INT()).partitionKeys(new String[]{"pt"}).primaryKey(new String[]{"pk"}).options(options.toMap()).build(), true);
        return GlobalIndexAssignerOperator.createRowDataAssigner(this.catalog.getTable(identifier));
    }

    @Test
    public void testBucketAssign() throws Exception {
        GlobalIndexAssigner<RowData> createAssigner = createAssigner(CoreOptions.MergeEngine.DEDUPLICATE);
        ArrayList arrayList = new ArrayList();
        createAssigner.open(new File(this.warehouse.getPath()), 2, 0, (rowData, num) -> {
            arrayList.add(num);
        });
        createAssigner.process(GenericRowData.of(new Object[]{1, 1, 1}));
        createAssigner.process(GenericRowData.of(new Object[]{1, 2, 2}));
        createAssigner.process(GenericRowData.of(new Object[]{1, 3, 3}));
        Assertions.assertThat(arrayList).containsExactly(new Integer[]{0, 0, 0});
        arrayList.clear();
        createAssigner.process(GenericRowData.of(new Object[]{1, 4, 4}));
        Assertions.assertThat(arrayList).containsExactly(new Integer[]{2});
        arrayList.clear();
        createAssigner.process(GenericRowData.of(new Object[]{2, 5, 5}));
        Assertions.assertThat(arrayList).containsExactly(new Integer[]{0});
        arrayList.clear();
        createAssigner.process(GenericRowData.of(new Object[]{1, 4, 4}));
        createAssigner.process(GenericRowData.of(new Object[]{1, 2, 2}));
        createAssigner.process(GenericRowData.of(new Object[]{1, 3, 3}));
        Assertions.assertThat(arrayList).containsExactly(new Integer[]{2, 0, 0});
        arrayList.clear();
        createAssigner.close();
    }

    @Test
    public void testUpsert() throws Exception {
        GlobalIndexAssigner<RowData> createAssigner = createAssigner(CoreOptions.MergeEngine.DEDUPLICATE);
        ArrayList arrayList = new ArrayList();
        createAssigner.open(new File(this.warehouse.getPath()), 2, 0, (rowData, num) -> {
            arrayList.add(new Tuple2(rowData, num));
        });
        createAssigner.process(GenericRowData.of(new Object[]{1, 1, 1}));
        createAssigner.process(GenericRowData.of(new Object[]{2, 1, 2}));
        Assertions.assertThat(arrayList).containsExactly(new Tuple2[]{new Tuple2(GenericRowData.of(new Object[]{1, 1, 1}), 0), new Tuple2(GenericRowData.ofKind(RowKind.DELETE, new Object[]{1, 1, 2}), 0), new Tuple2(GenericRowData.of(new Object[]{2, 1, 2}), 0)});
        arrayList.clear();
        createAssigner.process(GenericRowData.of(new Object[]{1, 2, 2}));
        createAssigner.process(GenericRowData.of(new Object[]{1, 3, 3}));
        createAssigner.process(GenericRowData.of(new Object[]{1, 4, 4}));
        Assertions.assertThat(arrayList.stream().map(tuple2 -> {
            return (Integer) tuple2.f1;
        })).containsExactly(new Integer[]{0, 0, 0});
        arrayList.clear();
        createAssigner.process(GenericRowData.of(new Object[]{2, 4, 4}));
        Assertions.assertThat(arrayList).containsExactly(new Tuple2[]{new Tuple2(GenericRowData.ofKind(RowKind.DELETE, new Object[]{1, 4, 4}), 0), new Tuple2(GenericRowData.of(new Object[]{2, 4, 4}), 0)});
        arrayList.clear();
        createAssigner.process(GenericRowData.of(new Object[]{1, 5, 5}));
        Assertions.assertThat(arrayList.stream().map(tuple22 -> {
            return (Integer) tuple22.f1;
        })).containsExactly(new Integer[]{0});
        arrayList.clear();
        createAssigner.close();
    }

    @Test
    public void testUseOldPartition() throws Exception {
        GlobalIndexAssigner<RowData> createAssigner = createAssigner(ThreadLocalRandom.current().nextBoolean() ? CoreOptions.MergeEngine.PARTIAL_UPDATE : CoreOptions.MergeEngine.AGGREGATE);
        ArrayList arrayList = new ArrayList();
        createAssigner.open(new File(this.warehouse.getPath()), 2, 0, (rowData, num) -> {
            arrayList.add(new Tuple2(rowData, num));
        });
        createAssigner.process(GenericRowData.of(new Object[]{1, 1, 1}));
        createAssigner.process(GenericRowData.of(new Object[]{2, 1, 2}));
        Assertions.assertThat(arrayList).containsExactly(new Tuple2[]{new Tuple2(GenericRowData.of(new Object[]{1, 1, 1}), 0), new Tuple2(GenericRowData.of(new Object[]{1, 1, 2}), 0)});
        arrayList.clear();
        createAssigner.process(GenericRowData.of(new Object[]{2, 2, 2}));
        createAssigner.process(GenericRowData.of(new Object[]{2, 3, 3}));
        createAssigner.process(GenericRowData.of(new Object[]{2, 4, 4}));
        Assertions.assertThat(arrayList.stream().map(tuple2 -> {
            return (Integer) tuple2.f1;
        })).containsExactly(new Integer[]{0, 0, 0});
        arrayList.clear();
    }

    @Test
    public void testFirstRow() throws Exception {
        GlobalIndexAssigner<RowData> createAssigner = createAssigner(CoreOptions.MergeEngine.FIRST_ROW);
        ArrayList arrayList = new ArrayList();
        createAssigner.open(new File(this.warehouse.getPath()), 2, 0, (rowData, num) -> {
            arrayList.add(new Tuple2(rowData, num));
        });
        createAssigner.process(GenericRowData.of(new Object[]{1, 1, 1}));
        createAssigner.process(GenericRowData.of(new Object[]{2, 1, 2}));
        Assertions.assertThat(arrayList).containsExactly(new Tuple2[]{new Tuple2(GenericRowData.of(new Object[]{1, 1, 1}), 0)});
        arrayList.clear();
        createAssigner.process(GenericRowData.of(new Object[]{2, 2, 2}));
        createAssigner.process(GenericRowData.of(new Object[]{2, 3, 3}));
        createAssigner.process(GenericRowData.of(new Object[]{2, 4, 4}));
        Assertions.assertThat(arrayList.stream().map(tuple2 -> {
            return (Integer) tuple2.f1;
        })).containsExactly(new Integer[]{0, 0, 0});
        arrayList.clear();
    }
}
