package org.apache.paimon.flink.source;

import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Pattern;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SerializationUtils;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.class */
public class MultiTablesCompactorSourceBuilderITCase extends AbstractTestBase implements Serializable {
    private String warehouse;
    private Options catalogOptions;
    private String commitUser;
    private static final String[] DATABASE_NAMES = {"db1", "db2"};
    private static final String[] TABLE_NAMES = {"t1", "t2"};
    private static final String[] New_DATABASE_NAMES = {"db3"};
    private static final String[] New_TABLE_NAMES = {"t1", "t2"};
    private static final Map<String, RowType> ROW_TYPE_MAP = new HashMap(TABLE_NAMES.length);

    @BeforeAll
    public static void beforeAll() {
        DataType[] dataTypeArr = {DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()};
        DataType[] dataTypeArr2 = {DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()};
        ROW_TYPE_MAP.put("t1", RowType.of(dataTypeArr, new String[]{"k", "v", "hh", "dt"}));
        ROW_TYPE_MAP.put("t2", RowType.of(dataTypeArr2, new String[]{"k", "v1", "hh", "dt"}));
    }

    @BeforeEach
    public void before() throws IOException {
        this.warehouse = getTempDirPath();
        this.catalogOptions = new Options();
        this.commitUser = UUID.randomUUID().toString();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest(name = "defaultOptions = {0}")
    public void testBatchRead(boolean z) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.WRITE_ONLY.key(), "true");
        if (!z) {
            hashMap.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2");
        }
        hashMap.put("bucket", "1");
        for (String str : DATABASE_NAMES) {
            for (String str2 : TABLE_NAMES) {
                FileStoreTable createTable = createTable(str, str2, ROW_TYPE_MAP.get(str2), Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), hashMap);
                SnapshotManager snapshotManager = createTable.snapshotManager();
                StreamWriteBuilder withCommitUser = createTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
                StreamTableWrite newWrite = withCommitUser.newWrite();
                StreamTableCommit newCommit = withCommitUser.newCommit();
                writeData(newWrite, newCommit, 0L, rowData(1, 100, 15, BinaryString.fromString("20221208")), rowData(1, 100, 16, BinaryString.fromString("20221208")), rowData(1, 100, 15, BinaryString.fromString("20221209")));
                writeData(newWrite, newCommit, 1L, rowData(2, 100, 15, BinaryString.fromString("20221208")), rowData(2, 100, 16, BinaryString.fromString("20221208")), rowData(2, 100, 15, BinaryString.fromString("20221209")));
                Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
                Assertions.assertThat(snapshot.id()).isEqualTo(2L);
                Assertions.assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
                newWrite.close();
                newCommit.close();
            }
        }
        CloseableIterator executeAndCollect = new MultiTablesCompactorSourceBuilder(catalogLoader(), Pattern.compile("db1|db2"), Pattern.compile(".*"), (Pattern) null, 1000L).withContinuousMode(false).withEnv(streamExecutionEnvironmentBuilder().batchMode().parallelism(ThreadLocalRandom.current().nextInt(2) + 1).build()).build().executeAndCollect();
        ArrayList arrayList = new ArrayList();
        while (executeAndCollect.hasNext()) {
            arrayList.add(toString((RowData) executeAndCollect.next()));
        }
        Assertions.assertThat(arrayList).hasSameElementsAs(Arrays.asList("+I 2|20221208|15|0|0|db1|t1", "+I 2|20221209|15|0|0|db1|t1", "+I 2|20221208|16|0|0|db1|t1", "+I 2|20221208|15|0|0|db1|t2", "+I 2|20221209|15|0|0|db1|t2", "+I 2|20221208|16|0|0|db1|t2", "+I 2|20221208|15|0|0|db2|t1", "+I 2|20221209|15|0|0|db2|t1", "+I 2|20221208|16|0|0|db2|t1", "+I 2|20221208|15|0|0|db2|t2", "+I 2|20221209|15|0|0|db2|t2", "+I 2|20221208|16|0|0|db2|t2"));
        executeAndCollect.close();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest(name = "defaultOptions = {0}")
    public void testStreamingRead(boolean z) throws Exception {
        HashMap hashMap = new HashMap();
        if (!z) {
            hashMap.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2");
            hashMap.put(CoreOptions.CHANGELOG_PRODUCER.key(), CoreOptions.ChangelogProducer.NONE.toString());
            hashMap.put(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "0");
        }
        hashMap.put("bucket", "1");
        ArrayList arrayList = new ArrayList();
        for (String str : DATABASE_NAMES) {
            for (String str2 : TABLE_NAMES) {
                FileStoreTable createTable = createTable(str, str2, ROW_TYPE_MAP.get(str2), Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), hashMap);
                arrayList.add(createTable);
                StreamWriteBuilder withCommitUser = createTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
                StreamTableWrite newWrite = withCommitUser.newWrite();
                StreamTableCommit newCommit = withCommitUser.newCommit();
                writeData(newWrite, newCommit, 0L, rowData(1, 1510, 15, BinaryString.fromString("20221208")), rowData(2, 1620, 16, BinaryString.fromString("20221208")));
                newWrite.write(rowData(1, 1511, 15, BinaryString.fromString("20221208")));
                newWrite.write(rowData(1, 1510, 15, BinaryString.fromString("20221209")));
                newWrite.compact(binaryRow("20221208", 15), 0, true);
                newWrite.compact(binaryRow("20221209", 15), 0, true);
                newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
                writeData(newWrite, newCommit, 2L, rowData(2, 1520, 15, BinaryString.fromString("20221208")), rowData(2, 1621, 16, BinaryString.fromString("20221208")));
                writeData(newWrite, newCommit, 3L, rowData(1, 1512, 15, BinaryString.fromString("20221208")), rowData(2, 1620, 16, BinaryString.fromString("20221209")));
                newWrite.close();
                newCommit.close();
            }
        }
        CloseableIterator executeAndCollect = new MultiTablesCompactorSourceBuilder(catalogLoader(), Pattern.compile(".*"), Pattern.compile(".*"), (Pattern) null, 1000L).withContinuousMode(true).withEnv(streamExecutionEnvironmentBuilder().streamingMode().build()).build().executeAndCollect();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < 16; i++) {
            arrayList2.add(toString((RowData) executeAndCollect.next()));
        }
        Assertions.assertThat(arrayList2).hasSameElementsAs(Arrays.asList("+I 4|20221208|15|0|1|db1|t1", "+I 4|20221208|16|0|1|db1|t1", "+I 4|20221208|15|0|1|db1|t2", "+I 4|20221208|16|0|1|db1|t2", "+I 4|20221208|15|0|1|db2|t1", "+I 4|20221208|16|0|1|db2|t1", "+I 4|20221208|15|0|1|db2|t2", "+I 4|20221208|16|0|1|db2|t2", "+I 5|20221209|16|0|1|db1|t1", "+I 5|20221208|15|0|1|db1|t1", "+I 5|20221209|16|0|1|db1|t2", "+I 5|20221208|15|0|1|db1|t2", "+I 5|20221209|16|0|1|db2|t1", "+I 5|20221208|15|0|1|db2|t1", "+I 5|20221209|16|0|1|db2|t2", "+I 5|20221208|15|0|1|db2|t2"));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            StreamWriteBuilder withCommitUser2 = ((FileStoreTable) it.next()).newStreamWriteBuilder().withCommitUser(this.commitUser);
            StreamTableWrite newWrite2 = withCommitUser2.newWrite();
            StreamTableCommit newCommit2 = withCommitUser2.newCommit();
            writeData(newWrite2, newCommit2, 4L, rowData(2, 1520, 15, BinaryString.fromString("20221209")), rowData(1, 1510, 16, BinaryString.fromString("20221208")), rowData(1, 1511, 15, BinaryString.fromString("20221209")));
            newWrite2.close();
            newCommit2.close();
        }
        arrayList2.clear();
        for (int i2 = 0; i2 < 8; i2++) {
            arrayList2.add(toString((RowData) executeAndCollect.next()));
        }
        Assertions.assertThat(arrayList2).hasSameElementsAs(Arrays.asList("+I 6|20221209|15|0|1|db1|t1", "+I 6|20221208|16|0|1|db1|t1", "+I 6|20221209|15|0|1|db1|t2", "+I 6|20221208|16|0|1|db1|t2", "+I 6|20221209|15|0|1|db2|t1", "+I 6|20221208|16|0|1|db2|t1", "+I 6|20221209|15|0|1|db2|t2", "+I 6|20221208|16|0|1|db2|t2"));
        for (String str3 : New_DATABASE_NAMES) {
            for (String str4 : New_TABLE_NAMES) {
                FileStoreTable createTable2 = createTable(str3, str4, ROW_TYPE_MAP.get(str4), Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), hashMap);
                arrayList.add(createTable2);
                StreamWriteBuilder withCommitUser3 = createTable2.newStreamWriteBuilder().withCommitUser(this.commitUser);
                StreamTableWrite newWrite3 = withCommitUser3.newWrite();
                StreamTableCommit newCommit3 = withCommitUser3.newCommit();
                writeData(newWrite3, newCommit3, 0L, rowData(2, 1520, 15, BinaryString.fromString("20221209")), rowData(1, 1510, 16, BinaryString.fromString("20221208")));
                newWrite3.close();
                newCommit3.close();
            }
        }
        arrayList2.clear();
        for (int i3 = 0; i3 < 4; i3++) {
            arrayList2.add(toString((RowData) executeAndCollect.next()));
        }
        Assertions.assertThat(arrayList2).hasSameElementsAs(Arrays.asList("+I 1|20221209|15|0|1|db3|t2", "+I 1|20221208|16|0|1|db3|t2", "+I 1|20221209|15|0|1|db3|t1", "+I 1|20221208|16|0|1|db3|t1"));
        executeAndCollect.close();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest(name = "defaultOptions = {0}")
    public void testIncludeAndExcludeTableRead(boolean z) throws Exception {
        HashMap hashMap = new HashMap();
        if (!z) {
            hashMap.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2");
            hashMap.put(CoreOptions.CHANGELOG_PRODUCER.key(), CoreOptions.ChangelogProducer.NONE.toString());
            hashMap.put(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "0");
        }
        hashMap.put("bucket", "1");
        for (String str : DATABASE_NAMES) {
            for (String str2 : TABLE_NAMES) {
                StreamWriteBuilder withCommitUser = createTable(str, str2, ROW_TYPE_MAP.get(str2), Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), hashMap).newStreamWriteBuilder().withCommitUser(this.commitUser);
                StreamTableWrite newWrite = withCommitUser.newWrite();
                StreamTableCommit newCommit = withCommitUser.newCommit();
                writeData(newWrite, newCommit, 0L, rowData(1, 1510, 15, BinaryString.fromString("20221208")), rowData(2, 1620, 16, BinaryString.fromString("20221208")));
                newWrite.write(rowData(1, 1511, 15, BinaryString.fromString("20221208")));
                newWrite.write(rowData(1, 1510, 15, BinaryString.fromString("20221209")));
                newWrite.compact(binaryRow("20221208", 15), 0, true);
                newWrite.compact(binaryRow("20221209", 15), 0, true);
                newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
                writeData(newWrite, newCommit, 2L, rowData(2, 1520, 15, BinaryString.fromString("20221208")), rowData(2, 1621, 16, BinaryString.fromString("20221208")));
                writeData(newWrite, newCommit, 3L, rowData(1, 1512, 15, BinaryString.fromString("20221208")), rowData(2, 1620, 16, BinaryString.fromString("20221209")));
                newWrite.close();
                newCommit.close();
            }
        }
        CloseableIterator executeAndCollect = new MultiTablesCompactorSourceBuilder(catalogLoader(), Pattern.compile(".*"), Pattern.compile("db1.+|db2.t1|db3.t1"), Pattern.compile("db1.t2"), 1000L).withContinuousMode(true).withEnv(streamExecutionEnvironmentBuilder().streamingMode().build()).build().executeAndCollect();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 8; i++) {
            arrayList.add(toString((RowData) executeAndCollect.next()));
        }
        Assertions.assertThat(arrayList).hasSameElementsAs(Arrays.asList("+I 4|20221208|15|0|1|db1|t1", "+I 4|20221208|16|0|1|db1|t1", "+I 4|20221208|15|0|1|db2|t1", "+I 4|20221208|16|0|1|db2|t1", "+I 5|20221209|16|0|1|db1|t1", "+I 5|20221208|15|0|1|db1|t1", "+I 5|20221209|16|0|1|db2|t1", "+I 5|20221208|15|0|1|db2|t1"));
        for (String str3 : New_DATABASE_NAMES) {
            for (String str4 : New_TABLE_NAMES) {
                StreamWriteBuilder withCommitUser2 = createTable(str3, str4, ROW_TYPE_MAP.get(str4), Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), hashMap).newStreamWriteBuilder().withCommitUser(this.commitUser);
                StreamTableWrite newWrite2 = withCommitUser2.newWrite();
                StreamTableCommit newCommit2 = withCommitUser2.newCommit();
                writeData(newWrite2, newCommit2, 0L, rowData(2, 1520, 15, BinaryString.fromString("20221209")), rowData(1, 1510, 16, BinaryString.fromString("20221208")));
                newWrite2.close();
                newCommit2.close();
            }
        }
        arrayList.clear();
        for (int i2 = 0; i2 < 2; i2++) {
            arrayList.add(toString((RowData) executeAndCollect.next()));
        }
        Assertions.assertThat(arrayList).hasSameElementsAs(Arrays.asList("+I 1|20221209|15|0|1|db3|t1", "+I 1|20221208|16|0|1|db3|t1"));
        executeAndCollect.close();
    }

    private FileStoreTable createTable(String str, String str2, RowType rowType, List<String> list, List<String> list2, Map<String, String> map) throws Exception {
        Catalog load = catalogLoader().load();
        Identifier create = Identifier.create(str, str2);
        load.createDatabase(str, true);
        load.createTable(create, new Schema(rowType.getFields(), list, list2, map, ""), false);
        return load.getTable(create);
    }

    private GenericRow rowData(Object... objArr) {
        return GenericRow.of(objArr);
    }

    private void writeData(StreamTableWrite streamTableWrite, StreamTableCommit streamTableCommit, long j, GenericRow... genericRowArr) throws Exception {
        for (GenericRow genericRow : genericRowArr) {
            streamTableWrite.write(genericRow);
        }
        streamTableCommit.commit(j, streamTableWrite.prepareCommit(true, j));
    }

    private String toString(RowData rowData) {
        try {
            int size = new DataFileMetaSerializer().deserializeList(rowData.getBinary(3)).size();
            BinaryRow deserializeBinaryRow = SerializationUtils.deserializeBinaryRow(rowData.getBinary(1));
            return String.format("%s %d|%s|%d|%d|%d|%s|%s", rowData.getRowKind().shortString(), Long.valueOf(rowData.getLong(0)), deserializeBinaryRow.getString(0), Integer.valueOf(deserializeBinaryRow.getInt(1)), Integer.valueOf(rowData.getInt(2)), Integer.valueOf(size), rowData.getString(4), rowData.getString(5));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private BinaryRow binaryRow(String str, int i) {
        BinaryRow binaryRow = new BinaryRow(2);
        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
        binaryRowWriter.writeString(0, BinaryString.fromString(str));
        binaryRowWriter.writeInt(1, i);
        binaryRowWriter.complete();
        return binaryRow;
    }

    private Catalog.Loader catalogLoader() {
        this.catalogOptions.set(CatalogOptions.WAREHOUSE, this.warehouse);
        return () -> {
            return FlinkCatalogFactory.createPaimonCatalog(this.catalogOptions);
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 308099615:
                if (implMethodName.equals("lambda$catalogLoader$15ef9c6$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/catalog/Catalog$Loader") && serializedLambda.getFunctionalInterfaceMethodName().equals("load") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Lorg/apache/paimon/catalog/Catalog;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/paimon/catalog/Catalog;")) {
                    MultiTablesCompactorSourceBuilderITCase multiTablesCompactorSourceBuilderITCase = (MultiTablesCompactorSourceBuilderITCase) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return FlinkCatalogFactory.createPaimonCatalog(this.catalogOptions);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
