package org.apache.flink.streaming.util;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.util.LatencyStats;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/util/LatencyStatsTest.class */
public class LatencyStatsTest extends TestLogger {
    private static final OperatorID OPERATOR_ID = new OperatorID();
    private static final OperatorID SOURCE_ID_1 = new OperatorID();
    private static final OperatorID SOURCE_ID_2 = new OperatorID();
    private static final int OPERATOR_SUBTASK_INDEX = 64;
    private static final String PARENT_GROUP_NAME = "parent";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/util/LatencyStatsTest$TestMetricRegistry.class */
    public static class TestMetricRegistry implements MetricRegistry {
        private final List<Tuple2<String, Histogram>> latencyHistograms;

        private TestMetricRegistry() {
            this.latencyHistograms = new ArrayList(4);
        }

        public void register(Metric metric, String str, AbstractMetricGroup abstractMetricGroup) {
            if (metric instanceof Histogram) {
                this.latencyHistograms.add(Tuple2.of(abstractMetricGroup.getMetricIdentifier(str), (Histogram) metric));
            }
        }

        public char getDelimiter() {
            return '.';
        }

        public char getDelimiter(int i) {
            return (char) 0;
        }

        public int getNumberReporters() {
            return 0;
        }

        public void unregister(Metric metric, String str, AbstractMetricGroup abstractMetricGroup) {
        }

        public ScopeFormats getScopeFormats() {
            return null;
        }

        @Nullable
        public String getMetricQueryServicePath() {
            return null;
        }
    }

    @Test
    public void testLatencyStatsSingle() {
        testLatencyStats(LatencyStats.Granularity.SINGLE, list -> {
            Assert.assertEquals(1L, list.size());
            Tuple2 tuple2 = (Tuple2) list.get(0);
            assertName((String) tuple2.f0);
            Assert.assertEquals(5L, ((Histogram) tuple2.f1).getCount());
        });
    }

    @Test
    public void testLatencyStatsOperator() {
        testLatencyStats(LatencyStats.Granularity.OPERATOR, list -> {
            Assert.assertEquals(2L, list.size());
            Tuple2 tuple2 = (Tuple2) list.get(0);
            assertName((String) tuple2.f0, SOURCE_ID_1);
            Assert.assertEquals(3L, ((Histogram) tuple2.f1).getCount());
            Tuple2 tuple22 = (Tuple2) list.get(1);
            assertName((String) tuple22.f0, SOURCE_ID_2);
            Assert.assertEquals(2L, ((Histogram) tuple22.f1).getCount());
        });
    }

    @Test
    public void testLatencyStatsSubtask() {
        testLatencyStats(LatencyStats.Granularity.SUBTASK, list -> {
            Assert.assertEquals(4L, list.size());
            Tuple2 tuple2 = (Tuple2) list.get(0);
            assertName((String) tuple2.f0, SOURCE_ID_1, 0);
            Assert.assertEquals(2L, ((Histogram) tuple2.f1).getCount());
            Tuple2 tuple22 = (Tuple2) list.get(1);
            assertName((String) tuple22.f0, SOURCE_ID_1, 1);
            Assert.assertEquals(1L, ((Histogram) tuple22.f1).getCount());
            Tuple2 tuple23 = (Tuple2) list.get(2);
            assertName((String) tuple23.f0, SOURCE_ID_2, 2);
            Assert.assertEquals(1L, ((Histogram) tuple23.f1).getCount());
            Tuple2 tuple24 = (Tuple2) list.get(3);
            assertName((String) tuple24.f0, SOURCE_ID_2, 3);
            Assert.assertEquals(1L, ((Histogram) tuple24.f1).getCount());
        });
    }

    private static void testLatencyStats(LatencyStats.Granularity granularity, Consumer<List<Tuple2<String, Histogram>>> consumer) {
        OperatorMetricGroup createUnregisteredOperatorMetricGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
        TestMetricRegistry testMetricRegistry = new TestMetricRegistry();
        LatencyStats latencyStats = new LatencyStats(new GenericMetricGroup(testMetricRegistry, createUnregisteredOperatorMetricGroup, PARENT_GROUP_NAME), ((Integer) MetricOptions.LATENCY_HISTORY_SIZE.defaultValue()).intValue(), OPERATOR_SUBTASK_INDEX, OPERATOR_ID, granularity);
        latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
        latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
        latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 1));
        latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 2));
        latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 3));
        consumer.accept(testMetricRegistry.latencyHistograms);
    }

    private static String sanitizeName(String str) {
        return str.substring(str.lastIndexOf(PARENT_GROUP_NAME) + PARENT_GROUP_NAME.length() + 1);
    }

    private static void assertName(String str) {
        Assert.assertEquals("operator_id." + OPERATOR_ID + ".operator_subtask_index." + OPERATOR_SUBTASK_INDEX + ".latency", sanitizeName(str));
    }

    private static void assertName(String str, OperatorID operatorID) {
        Assert.assertEquals("source_id." + operatorID + ".operator_id." + OPERATOR_ID + ".operator_subtask_index." + OPERATOR_SUBTASK_INDEX + ".latency", sanitizeName(str));
    }

    private static void assertName(String str, OperatorID operatorID, int i) {
        Assert.assertEquals("source_id." + operatorID + ".source_subtask_index." + i + ".operator_id." + OPERATOR_ID + ".operator_subtask_index." + OPERATOR_SUBTASK_INDEX + ".latency", sanitizeName(str));
    }
}
