package org.apache.paimon.flink.lookup;

import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable;
import org.apache.paimon.lookup.RocksDBOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TraceableFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.class */
public class FileStoreLookupFunctionTest {
    private static final Random RANDOM = new Random();

    @TempDir
    private Path tempDir;
    private final String commitUser = UUID.randomUUID().toString();
    private final TraceableFileIO fileIO = new TraceableFileIO();
    private org.apache.paimon.fs.Path tablePath;
    private FileStoreLookupFunction lookupFunction;
    private FileStoreTable table;

    @BeforeEach
    public void before() throws Exception {
        this.tablePath = new org.apache.paimon.fs.Path(this.tempDir.toString());
    }

    private void createLookupFunction() throws Exception {
        createLookupFunction(true, false);
    }

    private void createLookupFunction(boolean z, boolean z2) throws Exception {
        createLookupFunction(z, z2, false);
    }

    private void createLookupFunction(boolean z, boolean z2, boolean z3) throws Exception {
        SchemaManager schemaManager = new SchemaManager(this.fileIO, this.tablePath);
        Options options = new Options();
        options.set(CoreOptions.BUCKET, 2);
        options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 3);
        options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 2);
        options.set(RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL, Duration.ofSeconds(1L));
        if (z3) {
            options.set(FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION, "max_pt()");
        }
        this.table = FileStoreTableFactory.create(this.fileIO, new org.apache.paimon.fs.Path(this.tempDir.toString()), schemaManager.createTable(new Schema(RowType.of(new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()}, new String[]{"pt", "k", "v"}).getFields(), z ? Collections.singletonList("pt") : Collections.emptyList(), Arrays.asList("pt", "k"), options.toMap(), "")));
        this.lookupFunction = new FileStoreLookupFunction(this.table, new int[]{0, 1}, z2 ? new int[]{0, 1} : new int[]{1}, (Predicate) null);
        this.lookupFunction.open(this.tempDir.toString());
    }

    @AfterEach
    public void close() throws Exception {
        if (this.lookupFunction != null) {
            this.lookupFunction.close();
        }
    }

    @Test
    public void testDefaultLocalPartial() throws Exception {
        createLookupFunction(false, true);
        Assertions.assertThat(this.lookupFunction.lookupTable()).isInstanceOf(PrimaryKeyPartialLookupTable.class);
        Assertions.assertThat(this.lookupFunction.lookupTable().queryExecutor()).isInstanceOf(PrimaryKeyPartialLookupTable.LocalQueryExecutor.class);
    }

    @Test
    public void testDefaultRemotePartial() throws Exception {
        createLookupFunction(false, true);
        new ServiceManager(this.fileIO, this.tablePath).resetService("primary-key-lookup", new InetSocketAddress[]{new InetSocketAddress(1)});
        this.lookupFunction.open(this.tempDir.toString());
        Assertions.assertThat(this.lookupFunction.lookupTable()).isInstanceOf(PrimaryKeyPartialLookupTable.class);
        Assertions.assertThat(this.lookupFunction.lookupTable().queryExecutor()).isInstanceOf(PrimaryKeyPartialLookupTable.RemoteQueryExecutor.class);
    }

    @Test
    public void testLookupScanLeak() throws Exception {
        createLookupFunction();
        commit(writeCommit(1));
        this.lookupFunction.lookup(new FlinkRowData(GenericRow.of(new Object[]{1, 1, 10L})));
        Assertions.assertThat(TraceableFileIO.openInputStreams(path -> {
            return path.toString().contains(this.tempDir.toString());
        }).size()).isEqualTo(0);
        commit(writeCommit(10));
        this.lookupFunction.lookup(new FlinkRowData(GenericRow.of(new Object[]{1, 1, 10L})));
        Assertions.assertThat(TraceableFileIO.openInputStreams(path2 -> {
            return path2.toString().contains(this.tempDir.toString());
        }).size()).isEqualTo(0);
    }

    @Test
    public void testLookupExpiredSnapshot() throws Exception {
        createLookupFunction();
        commit(writeCommit(1));
        this.lookupFunction.lookup(new FlinkRowData(GenericRow.of(new Object[]{1, 1, 10L})));
        commit(writeCommit(2));
        commit(writeCommit(3));
        commit(writeCommit(4));
        commit(writeCommit(5));
        this.lookupFunction.lookup(new FlinkRowData(GenericRow.of(new Object[]{1, 1, 10L})));
    }

    @Test
    public void testLookupDynamicPartition() throws Exception {
        createLookupFunction(true, false, true);
        commit(writeCommit(1));
        this.lookupFunction.lookup(new FlinkRowData(GenericRow.of(new Object[]{1, 1, 10L})));
        Assertions.assertThat(TraceableFileIO.openInputStreams(path -> {
            return path.toString().contains(this.tempDir.toString());
        }).size()).isEqualTo(0);
        commit(writeCommit(10));
        this.lookupFunction.lookup(new FlinkRowData(GenericRow.of(new Object[]{1, 1, 10L})));
        Assertions.assertThat(TraceableFileIO.openInputStreams(path2 -> {
            return path2.toString().contains(this.tempDir.toString());
        }).size()).isEqualTo(0);
    }

    private void commit(List<CommitMessage> list) throws Exception {
        TableCommitImpl newCommit = this.table.newCommit(this.commitUser);
        newCommit.commit(list);
        newCommit.close();
    }

    private List<CommitMessage> writeCommit(int i) throws Exception {
        ArrayList arrayList = new ArrayList();
        StreamTableWrite newWrite = this.table.newStreamWriteBuilder().newWrite();
        for (int i2 = 0; i2 < i; i2++) {
            newWrite.write(randomRow());
            arrayList.addAll(newWrite.prepareCommit(true, i2));
        }
        return arrayList;
    }

    private InternalRow randomRow() {
        return GenericRow.of(new Object[]{Integer.valueOf(RANDOM.nextInt(100)), Integer.valueOf(RANDOM.nextInt(100)), Long.valueOf(RANDOM.nextLong())});
    }
}
