package org.apache.paimon.flink;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.query.RemoteTableQuery;
import org.apache.paimon.flink.service.QueryService;
import org.apache.paimon.io.DataFileTestUtils;
import org.apache.paimon.service.network.stats.DisabledServiceRequestStats;
import org.apache.paimon.service.server.KvQueryServer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/RemoteLookupJoinITCase.class */
public class RemoteLookupJoinITCase extends CatalogITCaseBase {

    /* loaded from: input_file:org/apache/paimon/flink/RemoteLookupJoinITCase$ServiceProxy.class */
    private interface ServiceProxy extends Closeable {
        void write(InternalRow internalRow) throws Exception;
    }

    @Override // org.apache.paimon.flink.CatalogITCaseBase
    public List<String> ddl() {
        return Collections.singletonList("CREATE TABLE T (i INT, `proctime` AS PROCTIME())");
    }

    @Override // org.apache.paimon.flink.CatalogITCaseBase
    protected int defaultParallelism() {
        return 1;
    }

    @Test
    public void testQueryServiceLookup() throws Exception {
        sql("CREATE TABLE DIM (k INT PRIMARY KEY NOT ENFORCED, v INT) WITH ('bucket' = '2', 'continuous.discovery-interval' = '1ms')", new Object[0]);
        CloseableIterator<Row> streamSqlIter = streamSqlIter("CALL sys.query_service('default.DIM', 2)", new Object[0]);
        RemoteTableQuery remoteTableQuery = new RemoteTableQuery(paimonTable("DIM"));
        sql("INSERT INTO DIM VALUES (1, 11), (2, 22), (3, 33), (4, 44)", new Object[0]);
        Thread.sleep(2000L);
        Assertions.assertThat(remoteTableQuery.lookup(DataFileTestUtils.row(new int[0]), 0, DataFileTestUtils.row(1))).isNotNull().extracting(internalRow -> {
            return Integer.valueOf(internalRow.getInt(1));
        }).isEqualTo(11);
        Assertions.assertThat(remoteTableQuery.lookup(DataFileTestUtils.row(new int[0]), 0, DataFileTestUtils.row(2))).isNotNull().extracting(internalRow2 -> {
            return Integer.valueOf(internalRow2.getInt(1));
        }).isEqualTo(22);
        Assertions.assertThat(remoteTableQuery.lookup(DataFileTestUtils.row(new int[0]), 1, DataFileTestUtils.row(3))).isNotNull().extracting(internalRow3 -> {
            return Integer.valueOf(internalRow3.getInt(1));
        }).isEqualTo(33);
        Assertions.assertThat(remoteTableQuery.lookup(DataFileTestUtils.row(new int[0]), 0, DataFileTestUtils.row(4))).isNotNull().extracting(internalRow4 -> {
            return Integer.valueOf(internalRow4.getInt(1));
        }).isEqualTo(44);
        Assertions.assertThat(remoteTableQuery.lookup(DataFileTestUtils.row(new int[0]), 0, DataFileTestUtils.row(5))).isNull();
        streamSqlIter.close();
        remoteTableQuery.close();
    }

    @Test
    public void testLookupRemoteTable() throws Throwable {
        sql("CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('bucket' = '1')", new Object[0]);
        ServiceProxy launchQueryServer = launchQueryServer("DIM");
        launchQueryServer.write(GenericRow.of(new Object[]{1, 11, 111, 1111}));
        launchQueryServer.write(GenericRow.of(new Object[]{2, 22, 222, 2222}));
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i").collect());
        sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        Assertions.assertThat(of.collect(3)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, null, null})});
        launchQueryServer.write(GenericRow.of(new Object[]{2, 44, 444, 4444}));
        launchQueryServer.write(GenericRow.of(new Object[]{3, 33, 333, 3333}));
        Thread.sleep(2000L);
        sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 44, 444}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, null, null})});
        of.close();
        launchQueryServer.close();
    }

    @Test
    public void testServiceFileCleaned() throws Exception {
        sql("CREATE TABLE DIM (k INT PRIMARY KEY NOT ENFORCED, v INT) WITH ('bucket' = '2', 'continuous.discovery-interval' = '1ms')", new Object[0]);
        JobClient queryService = queryService(paimonTable("DIM"));
        RemoteTableQuery remoteTableQuery = new RemoteTableQuery(paimonTable("DIM"));
        sql("INSERT INTO DIM VALUES (1, 11), (2, 22), (3, 33), (4, 44)", new Object[0]);
        Thread.sleep(2000L);
        Assertions.assertThat(remoteTableQuery.lookup(DataFileTestUtils.row(new int[0]), 0, DataFileTestUtils.row(1))).isNotNull().extracting(internalRow -> {
            return Integer.valueOf(internalRow.getInt(1));
        }).isEqualTo(11);
        queryService.cancel().get();
        remoteTableQuery.close();
        Assertions.assertThat(paimonTable("DIM").store().newServiceManager().service("primary-key-lookup").isPresent()).isFalse();
    }

    private JobClient queryService(FileStoreTable fileStoreTable) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        QueryService.build(executionEnvironment, fileStoreTable, 2);
        return executionEnvironment.executeAsync();
    }

    private ServiceProxy launchQueryServer(String str) throws Throwable {
        final FileStoreTable paimonTable = paimonTable(str);
        final LocalTableQuery withIOManager = paimonTable.newLocalTableQuery().withIOManager(IOManager.create(this.path));
        final KvQueryServer kvQueryServer = new KvQueryServer(0, 1, InetAddress.getLocalHost().getHostName(), Collections.singletonList(0).iterator(), 1, 1, withIOManager, new DisabledServiceRequestStats());
        kvQueryServer.start();
        paimonTable.store().newServiceManager().resetService("primary-key-lookup", new InetSocketAddress[]{kvQueryServer.getServerAddress()});
        return new ServiceProxy() { // from class: org.apache.paimon.flink.RemoteLookupJoinITCase.1
            @Override // org.apache.paimon.flink.RemoteLookupJoinITCase.ServiceProxy
            public void write(InternalRow internalRow) throws Exception {
                BatchWriteBuilder newBatchWriteBuilder = paimonTable.newBatchWriteBuilder();
                BatchTableWrite newWrite = newBatchWriteBuilder.newWrite();
                BatchTableCommit newCommit = newBatchWriteBuilder.newCommit();
                newWrite.write(internalRow);
                List prepareCommit = newWrite.prepareCommit();
                newCommit.commit(prepareCommit);
                CommitMessageImpl commitMessageImpl = (CommitMessageImpl) prepareCommit.get(0);
                withIOManager.refreshFiles(commitMessageImpl.partition(), commitMessageImpl.bucket(), Collections.emptyList(), commitMessageImpl.newFilesIncrement().newFiles());
            }

            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
                kvQueryServer.shutdown();
                withIOManager.close();
            }
        };
    }
}
