package org.apache.beam.runners.flink.metrics;

import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.common.collect.ImmutableList;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.class */
public class FlinkMetricContainerTest {

    @Mock
    private RuntimeContext runtimeContext;

    @Mock
    private MetricGroup metricGroup;
    static final String PTRANSFORM_LABEL = ((BeamFnApi.MonitoringInfoLabelProps) BeamFnApi.MonitoringInfo.MonitoringInfoLabels.forNumber(0).getValueDescriptor().getOptions().getExtension(BeamFnApi.labelProps)).getName();

    @Before
    public void beforeTest() {
        MockitoAnnotations.initMocks(this);
        Mockito.when(this.runtimeContext.getAccumulator(Matchers.anyString())).thenReturn(new MetricsAccumulator());
        Mockito.when(this.runtimeContext.getMetricGroup()).thenReturn(this.metricGroup);
    }

    @Test
    public void testMetricNameGeneration() {
        MetricResult metricResult = (MetricResult) Mockito.mock(MetricResult.class);
        Mockito.when(metricResult.getStep()).thenReturn("step");
        Mockito.when(metricResult.getName()).thenReturn(MetricName.named("namespace", "name"));
        Assert.assertThat(FlinkMetricContainer.getFlinkMetricNameString(metricResult), CoreMatchers.is("namespace.name"));
    }

    @Test
    public void testCounter() {
        SimpleCounter simpleCounter = new SimpleCounter();
        Mockito.when(this.metricGroup.counter("namespace.name")).thenReturn(simpleCounter);
        FlinkMetricContainer flinkMetricContainer = new FlinkMetricContainer(this.runtimeContext);
        Counter counter = flinkMetricContainer.getMetricsContainer("step").getCounter(MetricName.named("namespace", "name"));
        counter.inc();
        counter.inc();
        Assert.assertThat(Long.valueOf(simpleCounter.getCount()), CoreMatchers.is(0L));
        flinkMetricContainer.updateMetrics("step");
        Assert.assertThat(Long.valueOf(simpleCounter.getCount()), CoreMatchers.is(2L));
    }

    @Test
    public void testGauge() {
        FlinkMetricContainer.FlinkGauge flinkGauge = new FlinkMetricContainer.FlinkGauge(GaugeResult.empty());
        Mockito.when(this.metricGroup.gauge((String) Matchers.eq("namespace.name"), (Gauge) Matchers.anyObject())).thenReturn(flinkGauge);
        FlinkMetricContainer flinkMetricContainer = new FlinkMetricContainer(this.runtimeContext);
        org.apache.beam.sdk.metrics.Gauge gauge = flinkMetricContainer.getMetricsContainer("step").getGauge(MetricName.named("namespace", "name"));
        Assert.assertThat(flinkGauge.getValue(), CoreMatchers.is(GaugeResult.empty()));
        flinkMetricContainer.updateMetrics("step");
        gauge.set(1L);
        gauge.set(42L);
        flinkMetricContainer.updateMetrics("step");
        Assert.assertThat(Long.valueOf(flinkGauge.getValue().getValue()), CoreMatchers.is(42L));
    }

    @Test
    public void testMonitoringInfoUpdate() {
        FlinkMetricContainer flinkMetricContainer = new FlinkMetricContainer(this.runtimeContext);
        flinkMetricContainer.getMetricsContainer("step");
        SimpleCounter simpleCounter = new SimpleCounter();
        Mockito.when(this.metricGroup.counter("ns1.metric1")).thenReturn(simpleCounter);
        SimpleCounter simpleCounter2 = new SimpleCounter();
        Mockito.when(this.metricGroup.counter("beam.metric:element_count:v1")).thenReturn(simpleCounter2);
        SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder = new SimpleMonitoringInfoBuilder();
        simpleMonitoringInfoBuilder.setUrnForUserMetric("ns1", "metric1");
        simpleMonitoringInfoBuilder.setInt64Value(111L);
        BeamFnApi.MonitoringInfo build = simpleMonitoringInfoBuilder.build();
        Assert.assertNotNull(build);
        SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder2 = new SimpleMonitoringInfoBuilder();
        simpleMonitoringInfoBuilder2.setUrn(SimpleMonitoringInfoBuilder.ELEMENT_COUNT_URN);
        simpleMonitoringInfoBuilder2.setInt64Value(222L);
        simpleMonitoringInfoBuilder2.setPTransformLabel("step");
        simpleMonitoringInfoBuilder2.setPCollectionLabel("pcoll");
        BeamFnApi.MonitoringInfo build2 = simpleMonitoringInfoBuilder2.build();
        Assert.assertNotNull(build2);
        Assert.assertThat(Long.valueOf(simpleCounter.getCount()), CoreMatchers.is(0L));
        Assert.assertThat(Long.valueOf(simpleCounter2.getCount()), CoreMatchers.is(0L));
        flinkMetricContainer.updateMetrics("step", ImmutableList.of(build, build2));
        Assert.assertThat(Long.valueOf(simpleCounter.getCount()), CoreMatchers.is(111L));
        Assert.assertThat(Long.valueOf(simpleCounter2.getCount()), CoreMatchers.is(222L));
    }

    @Test
    public void testDropUnexpectedMonitoringInfoTypes() {
        FlinkMetricContainer flinkMetricContainer = new FlinkMetricContainer(this.runtimeContext);
        MetricsContainer metricsContainer = flinkMetricContainer.getMetricsContainer("step");
        BeamFnApi.MonitoringInfo build = BeamFnApi.MonitoringInfo.newBuilder().setUrn(SimpleMonitoringInfoBuilder.USER_COUNTER_URN_PREFIX + "ns1:int_counter").putLabels(PTRANSFORM_LABEL, "step").setMetric(BeamFnApi.Metric.newBuilder().setCounterData(BeamFnApi.CounterData.newBuilder().setInt64Value(111L))).build();
        BeamFnApi.MonitoringInfo build2 = BeamFnApi.MonitoringInfo.newBuilder().setUrn(SimpleMonitoringInfoBuilder.USER_COUNTER_URN_PREFIX + "ns2:double_counter").putLabels(PTRANSFORM_LABEL, "step").setMetric(BeamFnApi.Metric.newBuilder().setCounterData(BeamFnApi.CounterData.newBuilder().setDoubleValue(222.0d))).build();
        BeamFnApi.MonitoringInfo build3 = BeamFnApi.MonitoringInfo.newBuilder().setUrn(SimpleMonitoringInfoBuilder.USER_COUNTER_URN_PREFIX + "ns3:int_distribution").putLabels(PTRANSFORM_LABEL, "step").setMetric(BeamFnApi.Metric.newBuilder().setDistributionData(BeamFnApi.DistributionData.newBuilder().setIntDistributionData(BeamFnApi.IntDistributionData.newBuilder().setSum(30L).setCount(10L).setMin(1L).setMax(5L)))).build();
        BeamFnApi.MonitoringInfo build4 = BeamFnApi.MonitoringInfo.newBuilder().setUrn(SimpleMonitoringInfoBuilder.USER_COUNTER_URN_PREFIX + "ns4:double_distribution").putLabels(PTRANSFORM_LABEL, "step").setMetric(BeamFnApi.Metric.newBuilder().setDistributionData(BeamFnApi.DistributionData.newBuilder().setDoubleDistributionData(BeamFnApi.DoubleDistributionData.newBuilder().setSum(30.0d).setCount(10L).setMin(1.0d).setMax(5.0d)))).build();
        SimpleCounter simpleCounter = new SimpleCounter();
        Mockito.when(this.metricGroup.counter("ns1.int_counter")).thenReturn(simpleCounter);
        flinkMetricContainer.updateMetrics("step", ImmutableList.of(build, build2, build3, build4));
        ((MetricGroup) Mockito.verify(this.metricGroup)).counter((String) Matchers.eq("ns1.int_counter"));
        Assert.assertThat(Long.valueOf(simpleCounter.getCount()), CoreMatchers.is(111L));
        Assert.assertThat(Long.valueOf(metricsContainer.getCounter(MetricName.named("ns1", "int_counter")).getCumulative().longValue()), CoreMatchers.is(111L));
        ((MetricGroup) Mockito.verify(this.metricGroup)).gauge((String) Matchers.eq("ns3.int_distribution"), (FlinkMetricContainer.FlinkDistributionGauge) Matchers.argThat(new ArgumentMatcher<FlinkMetricContainer.FlinkDistributionGauge>() { // from class: org.apache.beam.runners.flink.metrics.FlinkMetricContainerTest.1
            public boolean matches(Object obj) {
                return ((FlinkMetricContainer.FlinkDistributionGauge) obj).getValue().equals(DistributionResult.create(30L, 10L, 1L, 5L));
            }
        }));
        Assert.assertThat(metricsContainer.getDistribution(MetricName.named("ns3", "int_distribution")).getCumulative(), CoreMatchers.is(DistributionData.create(30L, 10L, 1L, 5L)));
    }

    @Test
    public void testDistribution() {
        FlinkMetricContainer.FlinkDistributionGauge flinkDistributionGauge = new FlinkMetricContainer.FlinkDistributionGauge(DistributionResult.IDENTITY_ELEMENT);
        Mockito.when(this.metricGroup.gauge((String) Matchers.eq("namespace.name"), (Gauge) Matchers.anyObject())).thenReturn(flinkDistributionGauge);
        FlinkMetricContainer flinkMetricContainer = new FlinkMetricContainer(this.runtimeContext);
        Distribution distribution = flinkMetricContainer.getMetricsContainer("step").getDistribution(MetricName.named("namespace", "name"));
        Assert.assertThat(flinkDistributionGauge.getValue(), CoreMatchers.is(DistributionResult.IDENTITY_ELEMENT));
        flinkMetricContainer.updateMetrics("step");
        distribution.update(42L);
        distribution.update(-23L);
        distribution.update(0L);
        distribution.update(1L);
        flinkMetricContainer.updateMetrics("step");
        Assert.assertThat(Long.valueOf(flinkDistributionGauge.getValue().getMax()), CoreMatchers.is(42L));
        Assert.assertThat(Long.valueOf(flinkDistributionGauge.getValue().getMin()), CoreMatchers.is(-23L));
        Assert.assertThat(Long.valueOf(flinkDistributionGauge.getValue().getCount()), CoreMatchers.is(4L));
        Assert.assertThat(Long.valueOf(flinkDistributionGauge.getValue().getSum()), CoreMatchers.is(20L));
        Assert.assertThat(Double.valueOf(flinkDistributionGauge.getValue().getMean()), CoreMatchers.is(Double.valueOf(5.0d)));
    }
}
