package org.apache.flink.table.store.file.append;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.io.DataFilePathFactory;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.utils.BinaryRowDataUtil;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/table/store/file/append/AppendOnlyWriterTest.class */
public class AppendOnlyWriterTest {
    private static final RowData EMPTY_ROW = BinaryRowDataUtil.EMPTY_ROW;
    private static final RowType SCHEMA = RowType.of(new LogicalType[]{new IntType(), new VarCharType(), new VarCharType()}, new String[]{"id", "name", "dt"});
    private static final FieldStatsArraySerializer STATS_SERIALIZER = new FieldStatsArraySerializer(SCHEMA);

    @TempDir
    public Path tempDir;
    public DataFilePathFactory pathFactory;
    private static final String AVRO = "avro";
    private static final String PART = "2022-05-01";
    private static final long SCHEMA_ID = 0;
    private static final int MIN_FILE_NUM = 3;
    private static final int MAX_FILE_NUM = 4;

    @BeforeEach
    public void before() {
        this.pathFactory = createPathFactory();
    }

    @Test
    public void testEmptyCommits() throws Exception {
        AppendOnlyWriter createEmptyWriter = createEmptyWriter(1048576L);
        for (int i = 0; i < 3; i++) {
            createEmptyWriter.sync();
            RecordWriter.CommitIncrement prepareCommit = createEmptyWriter.prepareCommit(true);
            Assertions.assertThat(prepareCommit.newFilesIncrement().isEmpty()).isTrue();
            Assertions.assertThat(prepareCommit.compactIncrement().isEmpty()).isTrue();
        }
    }

    @Test
    public void testSingleWrite() throws Exception {
        AppendOnlyWriter createEmptyWriter = createEmptyWriter(1048576L);
        createEmptyWriter.write(row(1, "AAA", PART));
        RecordWriter.CommitIncrement prepareCommit = createEmptyWriter.prepareCommit(true);
        createEmptyWriter.close();
        Assertions.assertThat(prepareCommit.newFilesIncrement().newFiles().size()).isEqualTo(1);
        DataFileMeta dataFileMeta = (DataFileMeta) prepareCommit.newFilesIncrement().newFiles().get(0);
        Assertions.assertThat(dataFileMeta).isNotNull();
        org.apache.flink.core.fs.Path path = this.pathFactory.toPath(dataFileMeta.fileName());
        Assertions.assertThat(path.getFileSystem().exists(path)).isTrue();
        Assertions.assertThat(dataFileMeta.rowCount()).isEqualTo(1L);
        Assertions.assertThat(dataFileMeta.minKey()).isEqualTo(EMPTY_ROW);
        Assertions.assertThat(dataFileMeta.maxKey()).isEqualTo(EMPTY_ROW);
        Assertions.assertThat(dataFileMeta.keyStats()).isEqualTo(DataFileMeta.EMPTY_KEY_STATS);
        Assertions.assertThat(dataFileMeta.valueStats()).isEqualTo(STATS_SERIALIZER.toBinary(new FieldStats[]{initStats((Integer) 1, (Integer) 1, SCHEMA_ID), initStats("AAA", "AAA", SCHEMA_ID), initStats(PART, PART, SCHEMA_ID)}));
        Assertions.assertThat(dataFileMeta.minSequenceNumber()).isEqualTo(SCHEMA_ID);
        Assertions.assertThat(dataFileMeta.maxSequenceNumber()).isEqualTo(SCHEMA_ID);
        Assertions.assertThat(dataFileMeta.level()).isEqualTo(0);
    }

    @Test
    public void testMultipleCommits() throws Exception {
        RecordWriter recordWriter = (RecordWriter) createWriter(1048576L, true, Collections.emptyList()).f0;
        for (int i = 0; i < 5; i++) {
            int i2 = i * 100;
            int i3 = (i * 100) + 100;
            for (int i4 = i2; i4 < i3; i4++) {
                recordWriter.write(row(i4, String.format("%03d", Integer.valueOf(i4)), PART));
            }
            recordWriter.sync();
            RecordWriter.CommitIncrement prepareCommit = recordWriter.prepareCommit(true);
            if (i <= 0 || i % 3 != 0) {
                Assertions.assertThat(prepareCommit.compactIncrement().compactBefore()).isEqualTo(Collections.emptyList());
                Assertions.assertThat(prepareCommit.compactIncrement().compactAfter()).isEqualTo(Collections.emptyList());
            } else {
                Assertions.assertThat(prepareCommit.compactIncrement().compactBefore()).hasSize(4);
                Assertions.assertThat(prepareCommit.compactIncrement().compactAfter()).hasSize(1);
                DataFileMeta dataFileMeta = (DataFileMeta) prepareCommit.compactIncrement().compactAfter().get(0);
                Assertions.assertThat(dataFileMeta.fileName()).startsWith("compact-");
                Assertions.assertThat(dataFileMeta.fileSize()).isEqualTo(prepareCommit.compactIncrement().compactBefore().stream().mapToLong((v0) -> {
                    return v0.fileSize();
                }).sum());
                Assertions.assertThat(dataFileMeta.rowCount()).isEqualTo(prepareCommit.compactIncrement().compactBefore().stream().mapToLong((v0) -> {
                    return v0.rowCount();
                }).sum());
            }
            Assertions.assertThat(prepareCommit.newFilesIncrement().newFiles().size()).isEqualTo(1);
            DataFileMeta dataFileMeta2 = (DataFileMeta) prepareCommit.newFilesIncrement().newFiles().get(0);
            org.apache.flink.core.fs.Path path = this.pathFactory.toPath(dataFileMeta2.fileName());
            Assertions.assertThat(path.getFileSystem().exists(path)).isTrue();
            Assertions.assertThat(dataFileMeta2.rowCount()).isEqualTo(100L);
            Assertions.assertThat(dataFileMeta2.minKey()).isEqualTo(EMPTY_ROW);
            Assertions.assertThat(dataFileMeta2.maxKey()).isEqualTo(EMPTY_ROW);
            Assertions.assertThat(dataFileMeta2.keyStats()).isEqualTo(DataFileMeta.EMPTY_KEY_STATS);
            Assertions.assertThat(dataFileMeta2.valueStats()).isEqualTo(STATS_SERIALIZER.toBinary(new FieldStats[]{initStats(Integer.valueOf(i2), Integer.valueOf(i3 - 1), SCHEMA_ID), initStats(String.format("%03d", Integer.valueOf(i2)), String.format("%03d", Integer.valueOf(i3 - 1)), SCHEMA_ID), initStats(PART, PART, SCHEMA_ID)}));
            Assertions.assertThat(dataFileMeta2.minSequenceNumber()).isEqualTo(i2);
            Assertions.assertThat(dataFileMeta2.maxSequenceNumber()).isEqualTo(i3 - 1);
            Assertions.assertThat(dataFileMeta2.level()).isEqualTo(0);
        }
    }

    @Test
    public void testRollingWrite() throws Exception {
        AppendOnlyWriter createEmptyWriter = createEmptyWriter(10L);
        for (int i = 0; i < 10000; i++) {
            createEmptyWriter.write(row(i, String.format("%03d", Integer.valueOf(i)), PART));
        }
        createEmptyWriter.sync();
        RecordWriter.CommitIncrement prepareCommit = createEmptyWriter.prepareCommit(true);
        Assertions.assertThat(prepareCommit.compactIncrement().compactBefore()).isEqualTo(Collections.emptyList());
        Assertions.assertThat(prepareCommit.compactIncrement().compactAfter()).isEqualTo(Collections.emptyList());
        Assertions.assertThat(prepareCommit.newFilesIncrement().newFiles().size()).isEqualTo(10);
        int i2 = 0;
        for (DataFileMeta dataFileMeta : prepareCommit.newFilesIncrement().newFiles()) {
            org.apache.flink.core.fs.Path path = this.pathFactory.toPath(dataFileMeta.fileName());
            Assertions.assertThat(path.getFileSystem().exists(path)).isTrue();
            Assertions.assertThat(dataFileMeta.rowCount()).isEqualTo(1000L);
            Assertions.assertThat(dataFileMeta.minKey()).isEqualTo(EMPTY_ROW);
            Assertions.assertThat(dataFileMeta.maxKey()).isEqualTo(EMPTY_ROW);
            Assertions.assertThat(dataFileMeta.keyStats()).isEqualTo(DataFileMeta.EMPTY_KEY_STATS);
            int i3 = i2 * 1000;
            int i4 = (i2 * 1000) + 999;
            Assertions.assertThat(dataFileMeta.valueStats()).isEqualTo(STATS_SERIALIZER.toBinary(new FieldStats[]{initStats(Integer.valueOf(i3), Integer.valueOf(i4), SCHEMA_ID), initStats(String.format("%03d", Integer.valueOf(i3)), String.format("%03d", Integer.valueOf(i4)), SCHEMA_ID), initStats(PART, PART, SCHEMA_ID)}));
            Assertions.assertThat(dataFileMeta.minSequenceNumber()).isEqualTo(i3);
            Assertions.assertThat(dataFileMeta.maxSequenceNumber()).isEqualTo(i4);
            Assertions.assertThat(dataFileMeta.level()).isEqualTo(0);
            i2++;
        }
        Tuple2<AppendOnlyWriter, LinkedList<DataFileMeta>> createWriter = createWriter(1048576L, true, prepareCommit.newFilesIncrement().newFiles());
        AppendOnlyWriter appendOnlyWriter = (AppendOnlyWriter) createWriter.f0;
        LinkedList linkedList = (LinkedList) createWriter.f1;
        Assertions.assertThat(linkedList).containsExactlyElementsOf(prepareCommit.newFilesIncrement().newFiles());
        appendOnlyWriter.write(row(i2, String.format("%03d", Integer.valueOf(i2)), PART));
        appendOnlyWriter.sync();
        RecordWriter.CommitIncrement prepareCommit2 = appendOnlyWriter.prepareCommit(true);
        List compactBefore = prepareCommit2.compactIncrement().compactBefore();
        List compactAfter = prepareCommit2.compactIncrement().compactAfter();
        Assertions.assertThat(compactBefore).containsExactlyInAnyOrderElementsOf(prepareCommit.newFilesIncrement().newFiles().subList(0, 4));
        Assertions.assertThat(compactAfter).hasSize(1);
        Assertions.assertThat(compactBefore.stream().mapToLong((v0) -> {
            return v0.fileSize();
        }).sum()).isEqualTo(compactAfter.stream().mapToLong((v0) -> {
            return v0.fileSize();
        }).sum());
        Assertions.assertThat(compactBefore.stream().mapToLong((v0) -> {
            return v0.rowCount();
        }).sum()).isEqualTo(compactAfter.stream().mapToLong((v0) -> {
            return v0.rowCount();
        }).sum());
        Assertions.assertThat(((DataFileMeta) compactBefore.get(0)).minSequenceNumber()).isEqualTo(((DataFileMeta) compactAfter.get(0)).minSequenceNumber());
        Assertions.assertThat(((DataFileMeta) compactBefore.get(compactBefore.size() - 1)).maxSequenceNumber()).isEqualTo(((DataFileMeta) compactAfter.get(compactAfter.size() - 1)).maxSequenceNumber());
        Assertions.assertThat(prepareCommit2.newFilesIncrement().newFiles()).hasSize(1);
        ArrayList arrayList = new ArrayList(compactAfter);
        arrayList.addAll(prepareCommit.newFilesIncrement().newFiles().subList(4, prepareCommit.newFilesIncrement().newFiles().size()));
        arrayList.addAll(prepareCommit2.newFilesIncrement().newFiles());
        Assertions.assertThat(linkedList).containsExactlyElementsOf(arrayList);
    }

    private FieldStats initStats(Integer num, Integer num2, long j) {
        return new FieldStats(num, num2, j);
    }

    private FieldStats initStats(String str, String str2, long j) {
        return new FieldStats(StringData.fromString(str), StringData.fromString(str2), j);
    }

    private RowData row(int i, String str, String str2) {
        return GenericRowData.of(new Object[]{Integer.valueOf(i), StringData.fromString(str), StringData.fromString(str2)});
    }

    private DataFilePathFactory createPathFactory() {
        return new DataFilePathFactory(new org.apache.flink.core.fs.Path(this.tempDir.toString()), "dt=2022-05-01", 0, (String) CoreOptions.FILE_FORMAT.defaultValue());
    }

    private AppendOnlyWriter createEmptyWriter(long j) {
        return (AppendOnlyWriter) createWriter(j, false, Collections.emptyList()).f0;
    }

    private Tuple2<AppendOnlyWriter, LinkedList<DataFileMeta>> createWriter(long j, boolean z, List<DataFileMeta> list) {
        FileFormat fromIdentifier = FileFormat.fromIdentifier(AVRO, new Configuration());
        LinkedList linkedList = new LinkedList(list);
        return new Tuple2<>(new AppendOnlyWriter(SCHEMA_ID, fromIdentifier, j, SCHEMA, DataFileMeta.getMaxSequenceNumber(linkedList), new AppendOnlyCompactManager(Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("compaction-thread")), linkedList, 3, 4, j, list2 -> {
            return list2.isEmpty() ? Collections.emptyList() : Collections.singletonList(generateCompactAfter(list2));
        }, this.pathFactory), z, this.pathFactory), linkedList);
    }

    private DataFileMeta generateCompactAfter(List<DataFileMeta> list) {
        int size = list.size();
        return DataFileMeta.forAppend("compact-" + UUID.randomUUID(), list.stream().mapToLong((v0) -> {
            return v0.fileSize();
        }).sum(), list.stream().mapToLong((v0) -> {
            return v0.rowCount();
        }).sum(), STATS_SERIALIZER.toBinary(new FieldStats[]{initStats(Integer.valueOf(list.get(0).valueStats().min().getInt(0)), Integer.valueOf(list.get(size - 1).valueStats().max().getInt(0)), SCHEMA_ID), initStats(list.get(0).valueStats().min().getString(1).toString(), list.get(size - 1).valueStats().max().getString(1).toString(), SCHEMA_ID), initStats(PART, PART, SCHEMA_ID)}), list.get(0).minSequenceNumber(), list.get(size - 1).maxSequenceNumber(), list.get(0).schemaId());
    }
}
