package org.apache.flink.table.store.table;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.utils.TraceableFileSystem;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/table/store/table/FileStoreTableTestBase.class */
public abstract class FileStoreTableTestBase {
    protected static final RowType ROW_TYPE = RowType.of(new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BINARY(1).getLogicalType(), DataTypes.VARBINARY(1).getLogicalType(), DataTypes.MAP(DataTypes.VARCHAR(8), DataTypes.VARCHAR(8)).getLogicalType(), DataTypes.MULTISET(DataTypes.VARCHAR(8)).getLogicalType()}, new String[]{"pt", "a", "b", "c", "d", "e", "f"});
    protected static final int[] PROJECTION = {2, 1};
    protected static final Function<RowData, String> BATCH_ROW_TO_STRING = rowData -> {
        return rowData.getInt(0) + "|" + rowData.getInt(1) + "|" + rowData.getLong(2) + "|" + new String(rowData.getBinary(3)) + "|" + new String(rowData.getBinary(4)) + "|" + String.format("%s:%s", rowData.getMap(5).keyArray().getString(0).toString(), rowData.getMap(5).valueArray().getString(0)) + "|" + rowData.getMap(6).keyArray().getString(0).toString();
    };
    protected static final Function<RowData, String> BATCH_PROJECTED_ROW_TO_STRING = rowData -> {
        return rowData.getLong(0) + "|" + rowData.getInt(1);
    };
    protected static final Function<RowData, String> STREAMING_ROW_TO_STRING = rowData -> {
        return (rowData.getRowKind() == RowKind.INSERT ? "+" : "-") + BATCH_ROW_TO_STRING.apply(rowData);
    };
    protected static final Function<RowData, String> STREAMING_PROJECTED_ROW_TO_STRING = rowData -> {
        return (rowData.getRowKind() == RowKind.INSERT ? "+" : "-") + BATCH_PROJECTED_ROW_TO_STRING.apply(rowData);
    };
    protected static final Function<RowData, String> CHANGELOG_ROW_TO_STRING = rowData -> {
        return rowData.getRowKind().shortString() + " " + BATCH_ROW_TO_STRING.apply(rowData);
    };

    @TempDir
    Path tempDir;
    protected org.apache.flink.core.fs.Path tablePath;
    protected String commitUser;

    @BeforeEach
    public void before() {
        this.tablePath = new org.apache.flink.core.fs.Path("test://" + this.tempDir.toString());
        this.commitUser = UUID.randomUUID().toString();
    }

    @AfterEach
    public void after() throws IOException {
        FileSystem fileSystem = this.tablePath.getFileSystem();
        Assertions.assertThat(fileSystem).isInstanceOf(TraceableFileSystem.class);
        TraceableFileSystem traceableFileSystem = (TraceableFileSystem) fileSystem;
        Predicate<org.apache.flink.core.fs.Path> predicate = path -> {
            return path.toString().contains(this.tempDir.toString());
        };
        Assertions.assertThat(traceableFileSystem.openInputStreams(predicate)).isEmpty();
        Assertions.assertThat(traceableFileSystem.openOutputStreams(predicate)).isEmpty();
    }

    @Test
    public void testOverwrite() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 100L));
        newWrite.write(rowData(2, 20, 200L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.close();
        TableWrite withOverwrite = createFileStoreTable.newWrite(this.commitUser).withOverwrite(true);
        TableCommit newCommit2 = createFileStoreTable.newCommit(this.commitUser);
        withOverwrite.write(rowData(2, 21, 201L));
        HashMap hashMap = new HashMap();
        hashMap.put("pt", "2");
        newCommit2.withOverwritePartition(hashMap).commit(1L, withOverwrite.prepareCommit(true, 1L));
        withOverwrite.close();
        List<Split> splits = createFileStoreTable.newScan().plan().splits();
        TableRead newRead = createFileStoreTable.newRead();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).hasSameElementsAs(Collections.singletonList("1|10|100|binary|varbinary|mapKey:mapVal|multiset"));
        Assertions.assertThat(getResult(newRead, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING)).hasSameElementsAs(Collections.singletonList("2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testBucketFilter() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable(configuration -> {
            configuration.set(CoreOptions.BUCKET, 5);
            configuration.set(CoreOptions.BUCKET_KEY, "a");
        });
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        newWrite.write(rowData(1, 1, 2L));
        newWrite.write(rowData(1, 3, 4L));
        newWrite.write(rowData(1, 5, 6L));
        newWrite.write(rowData(1, 7, 8L));
        newWrite.write(rowData(1, 9, 10L));
        createFileStoreTable.newCommit(this.commitUser).commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.close();
        List splits = createFileStoreTable.newScan().withFilter(new PredicateBuilder(ROW_TYPE).equal(1, 5)).plan().splits();
        Assertions.assertThat(splits.size()).isEqualTo(1);
        Assertions.assertThat(((DataSplit) splits.get(0)).bucket()).isEqualTo(1);
    }

    @Test
    public void testReadFilter() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 100L));
        newWrite.write(rowData(1, 20, 200L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowData(1, 30, 300L));
        newWrite.write(rowData(1, 40, 400L));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        newWrite.write(rowData(1, 50, 500L));
        newWrite.write(rowData(1, 60, 600L));
        newCommit.commit(2L, newWrite.prepareCommit(true, 2L));
        newWrite.close();
        Assertions.assertThat(getResult(createFileStoreTable.newRead().withFilter(new PredicateBuilder(ROW_TYPE).equal(2, 300L)), createFileStoreTable.newScan().plan().splits(), binaryRow(1), 0, BATCH_ROW_TO_STRING)).hasSameElementsAs(Arrays.asList("1|30|300|binary|varbinary|mapKey:mapVal|multiset", "1|40|400|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testPartitionEmptyWriter() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        for (int i = 0; i < 4; i++) {
            for (int i2 = 0; i2 < 1000; i2++) {
                newWrite.write(rowData(1, Integer.valueOf(10 * i * i2), Long.valueOf(100 * i * i2)));
            }
            newCommit.commit(i, newWrite.prepareCommit(false, i));
        }
        newWrite.write(rowData(1, 40, 400L));
        List prepareCommit = newWrite.prepareCommit(false, 4L);
        if (((FileCommittable) prepareCommit.get(0)).compactIncrement().compactBefore().isEmpty()) {
            newWrite.write(rowData(2, 20, 200L));
            List prepareCommit2 = newWrite.prepareCommit(true, 5L);
            newWrite.write(rowData(1, 60, 600L));
            List prepareCommit3 = newWrite.prepareCommit(true, 6L);
            newCommit.commit(4L, prepareCommit);
            newCommit.commit(5L, prepareCommit2);
            newCommit.commit(6L, prepareCommit3);
        } else {
            newWrite.write(rowData(2, 20, 200L));
            List prepareCommit4 = newWrite.prepareCommit(true, 5L);
            newCommit.commit(4L, prepareCommit);
            newCommit.commit(5L, prepareCommit4);
        }
        newWrite.close();
    }

    @Test
    public void testWriteWithoutCompactionAndExpiration() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable(configuration -> {
            configuration.set(CoreOptions.WRITE_ONLY, true);
            configuration.set(CoreOptions.COMPACTION_MAX_FILE_NUM, 5);
            configuration.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 3);
            configuration.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 3);
        });
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        for (int i = 0; i < 10; i++) {
            newWrite.write(rowData(1, 1, 100L));
            newCommit.commit(i, newWrite.prepareCommit(true, i));
        }
        newWrite.close();
        Iterator it = ((List) createFileStoreTable.newScan().plan().splits().stream().map(split -> {
            return (DataSplit) split;
        }).flatMap(dataSplit -> {
            return dataSplit.files().stream();
        }).collect(Collectors.toList())).iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((DataFileMeta) it.next()).level()).isEqualTo(0);
        }
        SnapshotManager snapshotManager = new SnapshotManager(createFileStoreTable.location());
        Long latestSnapshotId = snapshotManager.latestSnapshotId();
        Assertions.assertThat(latestSnapshotId).isNotNull();
        for (int i2 = 1; i2 <= latestSnapshotId.longValue(); i2++) {
            Assertions.assertThat(snapshotManager.snapshot(i2).commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<String> getResult(TableRead tableRead, List<Split> list, BinaryRowData binaryRowData, int i, Function<RowData, String> function) throws Exception {
        return getResult(tableRead, getSplitsFor(list, binaryRowData, i), function);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<String> getResult(TableRead tableRead, List<Split> list, Function<RowData, String> function) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (Split split : list) {
            arrayList.add(() -> {
                return tableRead.createReader(split);
            });
        }
        RecordReaderIterator recordReaderIterator = new RecordReaderIterator(ConcatRecordReader.create(arrayList));
        ArrayList arrayList2 = new ArrayList();
        while (recordReaderIterator.hasNext()) {
            arrayList2.add(function.apply((RowData) recordReaderIterator.next()));
        }
        recordReaderIterator.close();
        return arrayList2;
    }

    private List<Split> getSplitsFor(List<Split> list, BinaryRowData binaryRowData, int i) {
        ArrayList arrayList = new ArrayList();
        Iterator<Split> it = list.iterator();
        while (it.hasNext()) {
            DataSplit dataSplit = (Split) it.next();
            DataSplit dataSplit2 = dataSplit;
            if (dataSplit2.partition().equals(binaryRowData) && dataSplit2.bucket() == i) {
                arrayList.add(dataSplit);
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BinaryRowData binaryRow(int i) {
        BinaryRowData binaryRowData = new BinaryRowData(1);
        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRowData);
        binaryRowWriter.writeInt(0, i);
        binaryRowWriter.complete();
        return binaryRowData;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public GenericRowData rowData(Object... objArr) {
        return GenericRowData.of(new Object[]{objArr[0], objArr[1], objArr[2], "binary".getBytes(), "varbinary".getBytes(), new GenericMapData(Collections.singletonMap(StringData.fromString("mapKey"), StringData.fromString("mapVal"))), new GenericMapData(Collections.singletonMap(StringData.fromString("multiset"), 1))});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public GenericRowData rowDataWithKind(RowKind rowKind, Object... objArr) {
        return GenericRowData.ofKind(rowKind, new Object[]{objArr[0], objArr[1], objArr[2], "binary".getBytes(), "varbinary".getBytes(), new GenericMapData(Collections.singletonMap(StringData.fromString("mapKey"), StringData.fromString("mapVal"))), new GenericMapData(Collections.singletonMap(StringData.fromString("multiset"), 1))});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FileStoreTable createFileStoreTable(int i) throws Exception {
        return createFileStoreTable(configuration -> {
            configuration.set(CoreOptions.BUCKET, Integer.valueOf(i));
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FileStoreTable createFileStoreTable() throws Exception {
        return createFileStoreTable(1);
    }

    protected abstract FileStoreTable createFileStoreTable(Consumer<Configuration> consumer) throws Exception;
}
