package org.apache.flink.table.store.connector.source;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
import org.apache.flink.table.store.file.io.DataFileMetaSerializer;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/store/connector/source/CompactorSourceITCase.class */
public class CompactorSourceITCase extends AbstractTestBase {
    private static final RowType ROW_TYPE = RowType.of(new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.INT().getLogicalType()}, new String[]{"k", "v", "dt", "hh"});
    private final DataFileMetaSerializer dataFileMetaSerializer = new DataFileMetaSerializer();
    private Path tablePath;
    private String commitUser;

    @Before
    public void before() throws IOException {
        this.tablePath = new Path(TEMPORARY_FOLDER.newFolder().toString());
        this.commitUser = UUID.randomUUID().toString();
    }

    @Test
    public void testBatchRead() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 1510, StringData.fromString("20221208"), 15));
        newWrite.write(rowData(2, 1620, StringData.fromString("20221208"), 16));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowData(1, 1511, StringData.fromString("20221208"), 15));
        newWrite.write(rowData(1, 1510, StringData.fromString("20221209"), 15));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        CloseableIterator executeAndCollect = new CompactorSourceBuilder("test", createFileStoreTable).withContinuousMode(false).withEnv(StreamExecutionEnvironment.getExecutionEnvironment()).build().executeAndCollect();
        ArrayList arrayList = new ArrayList();
        while (executeAndCollect.hasNext()) {
            arrayList.add(toString((RowData) executeAndCollect.next()));
        }
        Assertions.assertThat(arrayList).hasSameElementsAs(Arrays.asList("+I 2|20221208|15|0|0", "+I 2|20221208|16|0|0", "+I 2|20221209|15|0|0"));
        newWrite.close();
        newCommit.close();
        executeAndCollect.close();
    }

    @Test
    public void testStreamingRead() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 1510, StringData.fromString("20221208"), 15));
        newWrite.write(rowData(2, 1620, StringData.fromString("20221208"), 16));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowData(1, 1511, StringData.fromString("20221208"), 15));
        newWrite.write(rowData(1, 1510, StringData.fromString("20221209"), 15));
        newWrite.compact(binaryRow("20221208", 15), 0, true);
        newWrite.compact(binaryRow("20221209", 15), 0, true);
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        newWrite.write(rowData(2, 1520, StringData.fromString("20221208"), 15));
        newWrite.write(rowData(2, 1621, StringData.fromString("20221208"), 16));
        newCommit.commit(2L, newWrite.prepareCommit(true, 2L));
        newWrite.write(rowData(1, 1512, StringData.fromString("20221208"), 15));
        newWrite.write(rowData(2, 1620, StringData.fromString("20221209"), 16));
        newCommit.commit(3L, newWrite.prepareCommit(true, 3L));
        CloseableIterator executeAndCollect = new CompactorSourceBuilder("test", createFileStoreTable).withContinuousMode(true).withEnv(StreamExecutionEnvironment.getExecutionEnvironment()).build().executeAndCollect();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4; i++) {
            arrayList.add(toString((RowData) executeAndCollect.next()));
        }
        Assertions.assertThat(arrayList).hasSameElementsAs(Arrays.asList("+I 4|20221208|15|0|1", "+I 4|20221208|16|0|1", "+I 5|20221208|15|0|1", "+I 5|20221209|16|0|1"));
        newWrite.write(rowData(2, 1520, StringData.fromString("20221209"), 15));
        newWrite.write(rowData(1, 1510, StringData.fromString("20221208"), 16));
        newWrite.write(rowData(1, 1511, StringData.fromString("20221209"), 15));
        newCommit.commit(4L, newWrite.prepareCommit(true, 4L));
        arrayList.clear();
        for (int i2 = 0; i2 < 2; i2++) {
            arrayList.add(toString((RowData) executeAndCollect.next()));
        }
        Assertions.assertThat(arrayList).hasSameElementsAs(Arrays.asList("+I 6|20221208|16|0|1", "+I 6|20221209|15|0|1"));
        newWrite.close();
        newCommit.close();
        executeAndCollect.close();
    }

    @Test
    public void testStreamingPartitionSpec() throws Exception {
        testPartitionSpec(true, getSpecifiedPartitions(), Arrays.asList("+I 1|20221208|16|0|1", "+I 2|20221209|15|0|1", "+I 3|20221208|16|0|1", "+I 3|20221209|15|0|1"));
    }

    @Test
    public void testBatchPartitionSpec() throws Exception {
        testPartitionSpec(false, getSpecifiedPartitions(), Arrays.asList("+I 3|20221208|16|0|0", "+I 3|20221209|15|0|0"));
    }

    private List<Map<String, String>> getSpecifiedPartitions() {
        HashMap hashMap = new HashMap();
        hashMap.put("dt", "20221208");
        hashMap.put("hh", "16");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("dt", "20221209");
        hashMap2.put("hh", "15");
        return Arrays.asList(hashMap, hashMap2);
    }

    private void testPartitionSpec(boolean z, List<Map<String, String>> list, List<String> list2) throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 1510, StringData.fromString("20221208"), 15));
        newWrite.write(rowData(2, 1620, StringData.fromString("20221208"), 16));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowData(2, 1520, StringData.fromString("20221208"), 15));
        newWrite.write(rowData(2, 1520, StringData.fromString("20221209"), 15));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        newWrite.write(rowData(1, 1511, StringData.fromString("20221208"), 15));
        newWrite.write(rowData(1, 1610, StringData.fromString("20221208"), 16));
        newWrite.write(rowData(1, 1510, StringData.fromString("20221209"), 15));
        newCommit.commit(2L, newWrite.prepareCommit(true, 2L));
        CloseableIterator executeAndCollect = new CompactorSourceBuilder("test", createFileStoreTable).withContinuousMode(z).withEnv(StreamExecutionEnvironment.getExecutionEnvironment()).withPartitions(list).build().executeAndCollect();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < list2.size(); i++) {
            arrayList.add(toString((RowData) executeAndCollect.next()));
        }
        Assertions.assertThat(arrayList).hasSameElementsAs(list2);
        newWrite.close();
        newCommit.close();
        executeAndCollect.close();
    }

    private String toString(RowData rowData) {
        try {
            return String.format("%s %d|%s|%d|%d|%d", rowData.getRowKind().shortString(), Long.valueOf(rowData.getLong(0)), rowData.getString(1).toString(), Integer.valueOf(rowData.getInt(2)), Integer.valueOf(rowData.getInt(3)), Integer.valueOf(this.dataFileMetaSerializer.deserializeList(rowData.getBinary(4)).size()));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private GenericRowData rowData(Object... objArr) {
        return GenericRowData.of(objArr);
    }

    private BinaryRowData binaryRow(String str, int i) {
        BinaryRowData binaryRowData = new BinaryRowData(2);
        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRowData);
        binaryRowWriter.writeString(0, StringData.fromString(str));
        binaryRowWriter.writeInt(1, i);
        binaryRowWriter.complete();
        return binaryRowData;
    }

    private FileStoreTable createFileStoreTable() throws Exception {
        return FileStoreTableFactory.create(this.tablePath, new SchemaManager(this.tablePath).commitNewVersion(new UpdateSchema(ROW_TYPE, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), Collections.emptyMap(), "")));
    }
}
