package org.apache.paimon.flink.source;

import java.nio.file.Path;
import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.paimon.flink.util.MiniClusterWithClientExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/paimon/flink/source/SourceMetricsITCase.class */
public class SourceMetricsITCase {

    @TempDir
    Path tempPath;
    private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();
    private static final int DEFAULT_PARALLELISM = 4;

    @RegisterExtension
    protected static final MiniClusterWithClientExtension MINI_CLUSTER_EXTENSION = new MiniClusterWithClientExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM).setConfiguration(reporter.addToConfiguration(new Configuration())).build());

    @AfterEach
    public final void cleanupRunningJobs() throws Exception {
        RestClusterClient<MiniClusterClient.MiniClusterId> createRestClusterClient = MINI_CLUSTER_EXTENSION.createRestClusterClient();
        for (JobStatusMessage jobStatusMessage : (Collection) createRestClusterClient.listJobs().get()) {
            if (!jobStatusMessage.getJobState().isTerminalState()) {
                try {
                    createRestClusterClient.cancel(jobStatusMessage.getJobId()).get();
                } catch (Exception e) {
                }
            }
        }
    }

    @Test
    public void testNumRecordsIn() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
        create.executeSql("CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '" + this.tempPath + "' )");
        create.executeSql("USE CATALOG mycat");
        create.executeSql("CREATE TABLE T ( k INT, v INT, PRIMARY KEY (k) NOT ENFORCED )");
        create.executeSql("INSERT INTO T VALUES (1, 10), (2, 20), (3, 30)").await();
        create.executeSql("CREATE TEMPORARY TABLE B ( k INT, v INT ) WITH ( 'connector' = 'blackhole' )");
        TableResult executeSql = create.executeSql("INSERT INTO B SELECT * FROM T");
        JobID jobID = ((JobClient) executeSql.getJobClient().get()).getJobID();
        executeSql.await();
        Iterator it = reporter.findOperatorMetricGroups(jobID, "Source: T").iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((OperatorMetricGroup) it.next()).getIOMetricGroup().getNumRecordsInCounter().getCount()).isEqualTo(3L);
        }
    }

    @Test
    public void testNumRecordsInWithConsumerId() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
        create.executeSql("CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '" + this.tempPath + "' )");
        create.executeSql("USE CATALOG mycat");
        create.executeSql("CREATE TABLE T ( k INT, v INT, PRIMARY KEY (k) NOT ENFORCED ) WITH ( 'changelog-producer' = 'lookup' )");
        create.executeSql("INSERT INTO T VALUES (1, 10), (2, 20), (3, 30)").await();
        create.executeSql("CREATE TEMPORARY TABLE B ( k INT, v INT ) WITH ( 'connector' = 'blackhole' )");
        JobClient jobClient = (JobClient) create.executeSql("INSERT INTO B SELECT * FROM T /*+ OPTIONS('consumer-id' = 'test') */").getJobClient().get();
        Assertions.assertThat(testNumRecordsInWithConsumerIdChecker(jobClient.getJobID())).isTrue();
        jobClient.cancel().get();
    }

    private boolean testNumRecordsInWithConsumerIdChecker(JobID jobID) throws Exception {
        for (int i = 1; i <= 20; i++) {
            Iterator it = reporter.findOperatorMetricGroups(jobID, "T\\[").iterator();
            while (it.hasNext()) {
                if (((OperatorMetricGroup) it.next()).getIOMetricGroup().getNumRecordsInCounter().getCount() == 3) {
                    return true;
                }
            }
            Thread.sleep(1000L);
        }
        return false;
    }
}
