package org.apache.paimon.flink.lookup;

import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TraceableFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.class */
public class FileStoreLookupFunctionTest {
    private static final Random RANDOM = new Random();
    private final String commitUser = UUID.randomUUID().toString();
    private final TraceableFileIO fileIO = new TraceableFileIO();
    private FileStoreLookupFunction fileStoreLookupFunction;
    private FileStoreTable fileStoreTable;

    @TempDir
    private Path tempDir;

    @BeforeEach
    public void before() throws Exception {
        SchemaManager schemaManager = new SchemaManager(this.fileIO, new org.apache.paimon.fs.Path(this.tempDir.toString()));
        Options options = new Options();
        options.set(CoreOptions.BUCKET, 2);
        options.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(12288L));
        options.set(CoreOptions.PAGE_SIZE, new MemorySize(4096L));
        options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 3);
        options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 2);
        options.set(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL, Duration.ZERO);
        this.fileStoreTable = FileStoreTableFactory.create(this.fileIO, new org.apache.paimon.fs.Path(this.tempDir.toString()), schemaManager.createTable(new Schema(RowType.of(new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()}, new String[]{"pt", "k", "v"}).getFields(), Collections.singletonList("pt"), Arrays.asList("pt", "k"), options.toMap(), "")));
        this.fileStoreLookupFunction = new FileStoreLookupFunction(this.fileStoreTable, new int[]{0, 1}, new int[]{1}, (Predicate) null);
        this.fileStoreLookupFunction.open(this.tempDir.toString());
    }

    @Test
    public void testLookupScanLeak() throws Exception {
        commit(writeCommit(1));
        this.fileStoreLookupFunction.lookup(new FlinkRowData(GenericRow.of(new Object[]{1, 1, 10L})));
        Assertions.assertThat(TraceableFileIO.openInputStreams(path -> {
            return path.toString().contains(this.tempDir.toString());
        }).size()).isEqualTo(0);
        commit(writeCommit(10));
        this.fileStoreLookupFunction.lookup(new FlinkRowData(GenericRow.of(new Object[]{1, 1, 10L})));
        Assertions.assertThat(TraceableFileIO.openInputStreams(path2 -> {
            return path2.toString().contains(this.tempDir.toString());
        }).size()).isEqualTo(0);
    }

    @Test
    public void testLookupExpiredSnapshot() throws Exception {
        commit(writeCommit(1));
        this.fileStoreLookupFunction.lookup(new FlinkRowData(GenericRow.of(new Object[]{1, 1, 10L})));
        commit(writeCommit(2));
        commit(writeCommit(3));
        commit(writeCommit(4));
        commit(writeCommit(5));
        this.fileStoreLookupFunction.lookup(new FlinkRowData(GenericRow.of(new Object[]{1, 1, 10L})));
    }

    private void commit(List<CommitMessage> list) {
        this.fileStoreTable.newCommit(this.commitUser).commit(list);
    }

    private List<CommitMessage> writeCommit(int i) throws Exception {
        ArrayList arrayList = new ArrayList();
        StreamTableWrite newWrite = this.fileStoreTable.newStreamWriteBuilder().newWrite();
        for (int i2 = 0; i2 < i; i2++) {
            newWrite.write(randomRow());
            arrayList.addAll(newWrite.prepareCommit(true, i2));
        }
        return arrayList;
    }

    private InternalRow randomRow() {
        return GenericRow.of(new Object[]{Integer.valueOf(RANDOM.nextInt(100)), Integer.valueOf(RANDOM.nextInt(100)), Long.valueOf(RANDOM.nextLong())});
    }
}
