package org.apache.flink.connector.base.source.reader;

import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CyclicBarrier;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.testutils.MetricAssertions;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/connector/base/source/reader/SourceMetricsITCase.class */
public class SourceMetricsITCase extends TestLogger {

    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();
    private static final long EVENTTIME_LAG = Duration.ofDays(100).toMillis();
    private static final long WATERMARK_LAG = Duration.ofDays(1).toMillis();
    private static final long EVENTTIME_EPSILON = Duration.ofDays(20).toMillis();
    private static final long WATERMARK_EPSILON = Duration.ofHours(6).toMillis();
    private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();
    private static final int DEFAULT_PARALLELISM = 4;

    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM).setConfiguration(reporter.addToConfiguration(new Configuration())).build());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/SourceMetricsITCase$EagerBoundedOutOfOrdernessWatermarks.class */
    public static class EagerBoundedOutOfOrdernessWatermarks extends BoundedOutOfOrdernessWatermarks<Integer> {
        public EagerBoundedOutOfOrdernessWatermarks() {
            super(Duration.ofMillis(SourceMetricsITCase.WATERMARK_LAG));
        }

        public void onEvent(Integer num, long j, WatermarkOutput watermarkOutput) {
            super.onEvent(num, j, watermarkOutput);
            onPeriodicEmit(watermarkOutput);
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/SourceMetricsITCase$LaggingTimestampAssigner.class */
    private static class LaggingTimestampAssigner implements SerializableTimestampAssigner<Integer> {
        private final long baseTime;

        public LaggingTimestampAssigner(long j) {
            this.baseTime = j;
        }

        public long extractTimestamp(Integer num, long j) {
            return this.baseTime + num.intValue();
        }
    }

    @Test
    public void testMetricsWithTimestamp() throws Exception {
        testMetrics(WatermarkStrategy.forGenerator(context -> {
            return new EagerBoundedOutOfOrdernessWatermarks();
        }).withTimestampAssigner(new LaggingTimestampAssigner(System.currentTimeMillis() - EVENTTIME_LAG)), true);
    }

    @Test
    public void testMetricsWithoutTimestamp() throws Exception {
        testMetrics(WatermarkStrategy.noWatermarks(), false);
    }

    private void testMetrics(WatermarkStrategy<Integer> watermarkStrategy, boolean z) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        int max = Math.max(1, executionEnvironment.getParallelism() - 2);
        executionEnvironment.getConfig().setAutoWatermarkInterval(1L);
        int i = 10;
        MockBaseSource mockBaseSource = new MockBaseSource(max, 10, Boundedness.BOUNDED);
        SharedReference add = this.sharedObjects.add(new CyclicBarrier(max + 1));
        SharedReference add2 = this.sharedObjects.add(new CyclicBarrier(max + 1));
        int i2 = 3;
        int i3 = 10 - 1;
        executionEnvironment.fromSource(mockBaseSource, watermarkStrategy, "MetricTestingSource").map(num -> {
            if (num.intValue() % i == i2 || num.intValue() % i == i3) {
                ((CyclicBarrier) add.get()).await();
                ((CyclicBarrier) add2.get()).await();
            }
            return num;
        }).sinkTo(new DiscardingSink());
        JobClient executeAsync = executionEnvironment.executeAsync();
        JobID jobID = executeAsync.getJobID();
        ((CyclicBarrier) add.get()).await();
        assertSourceMetrics(jobID, reporter, 3 + 1, 10, executionEnvironment.getParallelism(), max, z);
        ((CyclicBarrier) add2.get()).await();
        ((CyclicBarrier) add.get()).await();
        assertSourceMetrics(jobID, reporter, i3 + 1, 10, executionEnvironment.getParallelism(), max, z);
        ((CyclicBarrier) add2.get()).await();
        executeAsync.getJobExecutionResult().get();
    }

    private void assertSourceMetrics(JobID jobID, InMemoryReporter inMemoryReporter, long j, long j2, int i, int i2, boolean z) {
        List<OperatorMetricGroup> findOperatorMetricGroups = inMemoryReporter.findOperatorMetricGroups(jobID, "MetricTestingSource");
        Assertions.assertThat(findOperatorMetricGroups).hasSize(i);
        int i3 = 0;
        for (OperatorMetricGroup operatorMetricGroup : findOperatorMetricGroups) {
            Map metricsByGroup = inMemoryReporter.getMetricsByGroup(operatorMetricGroup);
            if (operatorMetricGroup.getIOMetricGroup().getNumRecordsInCounter().getCount() == 0) {
                MetricAssertions.assertThatGauge((Metric) metricsByGroup.get("currentEmitEventTimeLag")).isEqualTo(-1L);
                Assertions.assertThat(metricsByGroup.get("watermarkLag")).isNull();
            } else {
                i3++;
                MetricAssertions.assertThatCounter(operatorMetricGroup.getIOMetricGroup().getNumRecordsInCounter()).isEqualTo(Long.valueOf(j));
                MetricAssertions.assertThatCounter(operatorMetricGroup.getIOMetricGroup().getNumBytesInCounter()).isEqualTo(Long.valueOf(j * 10));
                MetricAssertions.assertThatCounter(operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter()).isEqualTo(Long.valueOf(j));
                MetricAssertions.assertThatCounter(operatorMetricGroup.getIOMetricGroup().getNumBytesOutCounter()).isEqualTo(Long.valueOf(j * 10));
                MetricAssertions.assertThatCounter((Metric) metricsByGroup.get("numRecordsInErrors")).isEqualTo(Long.valueOf(j / 2));
                if (z) {
                    MetricAssertions.assertThatGauge((Metric) metricsByGroup.get("currentEmitEventTimeLag")).isCloseTo(EVENTTIME_LAG, EVENTTIME_EPSILON);
                    MetricAssertions.assertThatGauge((Metric) metricsByGroup.get("watermarkLag")).isCloseTo(EVENTTIME_LAG, EVENTTIME_EPSILON);
                    Assertions.assertThat(Long.valueOf(((Long) ((Gauge) metricsByGroup.get("watermarkLag")).getValue()).longValue() - ((Long) ((Gauge) metricsByGroup.get("currentEmitEventTimeLag")).getValue()).longValue())).isGreaterThan(WATERMARK_LAG - WATERMARK_EPSILON).isLessThan(WATERMARK_LAG + WATERMARK_EPSILON);
                } else {
                    MetricAssertions.assertThatGauge((Metric) metricsByGroup.get("currentEmitEventTimeLag")).isEqualTo(-1L);
                    Assertions.assertThat(metricsByGroup.get("watermarkLag")).isNull();
                }
                long j3 = j2 - j;
                MetricAssertions.assertThatGauge((Metric) metricsByGroup.get("pendingRecords")).isEqualTo(Long.valueOf(j3));
                MetricAssertions.assertThatGauge((Metric) metricsByGroup.get("pendingBytes")).isEqualTo(Long.valueOf(j3 * 10));
                MetricAssertions.assertThatGauge((Metric) metricsByGroup.get("sourceIdleTime")).isEqualTo(0L);
            }
        }
        Assertions.assertThat(i3).isEqualTo(i2);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 743007377:
                if (implMethodName.equals("lambda$testMetrics$7c40611b$1")) {
                    z = false;
                    break;
                }
                break;
            case 1633504388:
                if (implMethodName.equals("lambda$testMetricsWithTimestamp$6138499c$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/base/source/reader/SourceMetricsITCase") && serializedLambda.getImplMethodSignature().equals("(IIILorg/apache/flink/testutils/junit/SharedReference;Lorg/apache/flink/testutils/junit/SharedReference;Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    int intValue2 = ((Integer) serializedLambda.getCapturedArg(1)).intValue();
                    int intValue3 = ((Integer) serializedLambda.getCapturedArg(2)).intValue();
                    SharedReference sharedReference = (SharedReference) serializedLambda.getCapturedArg(3);
                    SharedReference sharedReference2 = (SharedReference) serializedLambda.getCapturedArg(DEFAULT_PARALLELISM);
                    return num -> {
                        if (num.intValue() % intValue == intValue2 || num.intValue() % intValue == intValue3) {
                            ((CyclicBarrier) sharedReference.get()).await();
                            ((CyclicBarrier) sharedReference2.get()).await();
                        }
                        return num;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("createWatermarkGenerator") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/base/source/reader/SourceMetricsITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;")) {
                    return context -> {
                        return new EagerBoundedOutOfOrdernessWatermarks();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
