package org.apache.paimon.flink.source;

import java.nio.file.Path;
import org.apache.flink.api.common.JobID;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.InternalOperatorCoordinatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSplitEnumeratorMetricGroup;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/paimon/flink/source/FileStoreSourceMetricsTest.class */
public class FileStoreSourceMetricsTest {
    private FileStoreTable table;
    private TestingSplitEnumeratorContextWithRegisteringGroup context;
    private MetricGroup scanMetricGroup;

    /* loaded from: input_file:org/apache/paimon/flink/source/FileStoreSourceMetricsTest$TestingSplitEnumeratorContextWithRegisteringGroup.class */
    private class TestingSplitEnumeratorContextWithRegisteringGroup extends TestingSplitEnumeratorContext<FileStoreSourceSplit> {
        private final SplitEnumeratorMetricGroup metricGroup;

        public TestingSplitEnumeratorContextWithRegisteringGroup(int i) {
            super(i);
            JobID jobID = new JobID();
            this.metricGroup = new InternalSplitEnumeratorMetricGroup(new InternalOperatorCoordinatorMetricGroup(JobManagerMetricGroup.createJobManagerMetricGroup(TestingMetricRegistry.builder().build(), "localhost").addJob(jobID, "myJobName").getOrAddOperator(new JobVertexID(), "taskName", new OperatorID(), "opName")));
        }

        public SplitEnumeratorMetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    @BeforeEach
    public void before(@TempDir Path path) throws Exception {
        LocalFileIO create = LocalFileIO.create();
        org.apache.paimon.fs.Path path2 = new org.apache.paimon.fs.Path(path.toString());
        this.table = FileStoreTableFactory.create(create, path2, new SchemaManager(create, path2).createTable(Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.BIGINT()).build()));
        this.context = new TestingSplitEnumeratorContextWithRegisteringGroup(1);
        this.scanMetricGroup = this.context.metricGroup().addGroup("paimon").addGroup("table", this.table.name()).addGroup("scan");
    }

    @Test
    public void staticFileStoreSourceScanMetricsTest() throws Exception {
        writeOnce();
        new StaticFileStoreSource(this.table.newReadBuilder(), (Long) null, 1, FlinkConnectorOptions.SplitAssignMode.FAIR).restoreEnumerator(this.context, (PendingSplitsCheckpoint) null);
        Assertions.assertThat(TestingMetricUtils.getGauge(this.scanMetricGroup, "lastScannedManifests").getValue()).isEqualTo(1L);
        Assertions.assertThat(TestingMetricUtils.getGauge(this.scanMetricGroup, "lastScanResultedTableFiles").getValue()).isEqualTo(1L);
    }

    @Test
    public void continuousFileStoreSourceScanMetricsTest() throws Exception {
        writeOnce();
        ContinuousFileSplitEnumerator restoreEnumerator = new ContinuousFileStoreSource(this.table.newReadBuilder(), this.table.options(), (Long) null).restoreEnumerator(this.context, (PendingSplitsCheckpoint) null);
        restoreEnumerator.scanNextSnapshot();
        Assertions.assertThat(TestingMetricUtils.getHistogram(this.scanMetricGroup, "scanDuration").getCount()).isEqualTo(1L);
        Assertions.assertThat(TestingMetricUtils.getGauge(this.scanMetricGroup, "lastScannedManifests").getValue()).isEqualTo(1L);
        Assertions.assertThat(TestingMetricUtils.getGauge(this.scanMetricGroup, "lastScanResultedTableFiles").getValue()).isEqualTo(1L);
        writeAgain();
        restoreEnumerator.scanNextSnapshot();
        Assertions.assertThat(TestingMetricUtils.getHistogram(this.scanMetricGroup, "scanDuration").getCount()).isEqualTo(2L);
        Assertions.assertThat(TestingMetricUtils.getGauge(this.scanMetricGroup, "lastScannedManifests").getValue()).isEqualTo(1L);
        Assertions.assertThat(TestingMetricUtils.getGauge(this.scanMetricGroup, "lastScanResultedTableFiles").getValue()).isEqualTo(1L);
    }

    @Test
    public void logHybridFileStoreSourceScanMetricsTest() throws Exception {
        writeOnce();
        LogHybridSourceFactory.buildHybridFirstSource(this.table, (int[][]) null, (Predicate) null).restoreEnumerator(this.context, (Object) null);
        Assertions.assertThat(TestingMetricUtils.getGauge(this.scanMetricGroup, "lastScannedManifests").getValue()).isEqualTo(1L);
        Assertions.assertThat(TestingMetricUtils.getGauge(this.scanMetricGroup, "lastScanResultedTableFiles").getValue()).isEqualTo(1L);
    }

    private void writeOnce() throws Exception {
        TableWriteImpl newWrite = this.table.newWrite("test");
        TableCommitImpl newCommit = this.table.newCommit("test");
        newWrite.write(GenericRow.of(new Object[]{1, 2L}));
        newWrite.write(GenericRow.of(new Object[]{3, 4L}));
        newWrite.write(GenericRow.of(new Object[]{5, 6L}));
        newWrite.write(GenericRow.of(new Object[]{7, 8L}));
        newWrite.write(GenericRow.of(new Object[]{9, 10L}));
        newCommit.commit(newWrite.prepareCommit());
        newCommit.close();
        newWrite.close();
    }

    private void writeAgain() throws Exception {
        TableWriteImpl newWrite = this.table.newWrite("test");
        TableCommitImpl newCommit = this.table.newCommit("test");
        newWrite.write(GenericRow.of(new Object[]{10, 2L}));
        newWrite.write(GenericRow.of(new Object[]{13, 24L}));
        newWrite.write(GenericRow.of(new Object[]{15, 26L}));
        newWrite.write(GenericRow.of(new Object[]{17, 28L}));
        newWrite.write(GenericRow.of(new Object[]{19, 10L}));
        newCommit.commit(newWrite.prepareCommit());
        newCommit.close();
        newWrite.close();
    }
}
