package org.apache.beam.runners.flink.metrics;

import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.class */
public class FlinkMetricContainerTest {

    @Mock
    private RuntimeContext runtimeContext;

    @Mock
    private MetricGroup metricGroup;

    @Before
    public void beforeTest() {
        MockitoAnnotations.initMocks(this);
        Mockito.when(this.runtimeContext.getAccumulator(Matchers.anyString())).thenReturn(new MetricsAccumulator());
        Mockito.when(this.runtimeContext.getMetricGroup()).thenReturn(this.metricGroup);
    }

    @Test
    public void testMetricNameGeneration() {
        MetricResult metricResult = (MetricResult) Mockito.mock(MetricResult.class);
        Mockito.when(metricResult.getStep()).thenReturn("step");
        Mockito.when(metricResult.getName()).thenReturn(MetricName.named("namespace", "name"));
        Assert.assertThat(FlinkMetricContainer.getFlinkMetricNameString(metricResult), CoreMatchers.is("namespace.name"));
    }

    @Test
    public void testCounter() {
        SimpleCounter simpleCounter = new SimpleCounter();
        Mockito.when(this.metricGroup.counter("namespace.name")).thenReturn(simpleCounter);
        FlinkMetricContainer flinkMetricContainer = new FlinkMetricContainer(this.runtimeContext);
        Counter counter = flinkMetricContainer.getMetricsContainer("step").getCounter(MetricName.named("namespace", "name"));
        counter.inc();
        counter.inc();
        Assert.assertThat(Long.valueOf(simpleCounter.getCount()), CoreMatchers.is(0L));
        flinkMetricContainer.updateMetrics("step");
        Assert.assertThat(Long.valueOf(simpleCounter.getCount()), CoreMatchers.is(2L));
    }

    @Test
    public void testGauge() {
        FlinkMetricContainer.FlinkGauge flinkGauge = new FlinkMetricContainer.FlinkGauge(GaugeResult.empty());
        Mockito.when(this.metricGroup.gauge((String) Matchers.eq("namespace.name"), (Gauge) Matchers.anyObject())).thenReturn(flinkGauge);
        FlinkMetricContainer flinkMetricContainer = new FlinkMetricContainer(this.runtimeContext);
        org.apache.beam.sdk.metrics.Gauge gauge = flinkMetricContainer.getMetricsContainer("step").getGauge(MetricName.named("namespace", "name"));
        Assert.assertThat(flinkGauge.getValue(), CoreMatchers.is(GaugeResult.empty()));
        flinkMetricContainer.updateMetrics("step");
        gauge.set(1L);
        gauge.set(42L);
        flinkMetricContainer.updateMetrics("step");
        Assert.assertThat(Long.valueOf(flinkGauge.getValue().getValue()), CoreMatchers.is(42L));
    }

    @Test
    public void testDistribution() {
        FlinkMetricContainer.FlinkDistributionGauge flinkDistributionGauge = new FlinkMetricContainer.FlinkDistributionGauge(DistributionResult.IDENTITY_ELEMENT);
        Mockito.when(this.metricGroup.gauge((String) Matchers.eq("namespace.name"), (Gauge) Matchers.anyObject())).thenReturn(flinkDistributionGauge);
        FlinkMetricContainer flinkMetricContainer = new FlinkMetricContainer(this.runtimeContext);
        Distribution distribution = flinkMetricContainer.getMetricsContainer("step").getDistribution(MetricName.named("namespace", "name"));
        Assert.assertThat(flinkDistributionGauge.getValue(), CoreMatchers.is(DistributionResult.IDENTITY_ELEMENT));
        flinkMetricContainer.updateMetrics("step");
        distribution.update(42L);
        distribution.update(-23L);
        distribution.update(0L);
        distribution.update(1L);
        flinkMetricContainer.updateMetrics("step");
        Assert.assertThat(Long.valueOf(flinkDistributionGauge.getValue().getMax()), CoreMatchers.is(42L));
        Assert.assertThat(Long.valueOf(flinkDistributionGauge.getValue().getMin()), CoreMatchers.is(-23L));
        Assert.assertThat(Long.valueOf(flinkDistributionGauge.getValue().getCount()), CoreMatchers.is(4L));
        Assert.assertThat(Long.valueOf(flinkDistributionGauge.getValue().getSum()), CoreMatchers.is(20L));
        Assert.assertThat(Double.valueOf(flinkDistributionGauge.getValue().getMean()), CoreMatchers.is(Double.valueOf(5.0d)));
    }
}
