package org.apache.beam.fn.harness.control;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.control.Metrics;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/control/MetricsTest.class */
public class MetricsTest {
    private static final MetricName TEST_NAME = MetricName.named("testNamespace", "testName");
    private static final String TEST_ID = "testId";

    @Rule
    public TestExecutors.TestExecutorService executor = TestExecutors.from((Supplier<ExecutorService>) Executors::newCachedThreadPool);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/fn/harness/control/MetricsTest$MetricMutator.class */
    public interface MetricMutator {
        void mutate();
    }

    @Test
    public void testAccurateBundleCounterReportsValueFirstTimeWithoutMutations() throws Exception {
        HashMap hashMap = new HashMap();
        Metrics.BundleCounter bundleProcessingThreadCounter = Metrics.bundleProcessingThreadCounter(TEST_ID, TEST_NAME);
        bundleProcessingThreadCounter.updateIntermediateMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Counter(0L)));
        hashMap.clear();
        bundleProcessingThreadCounter.updateFinalMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.emptyMap());
        bundleProcessingThreadCounter.reset();
        bundleProcessingThreadCounter.updateFinalMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.emptyMap());
    }

    @Test
    public void testAccurateBundleDistributionReportsValueFirstTimeWithoutMutations() throws Exception {
        HashMap hashMap = new HashMap();
        Metrics.BundleDistribution bundleProcessingThreadDistribution = Metrics.bundleProcessingThreadDistribution(TEST_ID, TEST_NAME);
        bundleProcessingThreadDistribution.updateIntermediateMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Distribution(DistributionData.EMPTY)));
        hashMap.clear();
        bundleProcessingThreadDistribution.updateFinalMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.emptyMap());
        bundleProcessingThreadDistribution.reset();
        bundleProcessingThreadDistribution.updateFinalMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.emptyMap());
    }

    @Test
    public void testAccurateBundleCounterWithMutations() throws Exception {
        HashMap hashMap = new HashMap();
        Metrics.BundleCounter bundleProcessingThreadCounter = Metrics.bundleProcessingThreadCounter(TEST_ID, TEST_NAME);
        bundleProcessingThreadCounter.inc(7L);
        bundleProcessingThreadCounter.updateIntermediateMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Counter(7L)));
        hashMap.clear();
        bundleProcessingThreadCounter.updateIntermediateMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.emptyMap());
        bundleProcessingThreadCounter.inc();
        bundleProcessingThreadCounter.updateIntermediateMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Counter(8L)));
        hashMap.clear();
        bundleProcessingThreadCounter.dec(4L);
        bundleProcessingThreadCounter.updateIntermediateMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Counter(4L)));
        bundleProcessingThreadCounter.dec();
        bundleProcessingThreadCounter.updateFinalMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Counter(3L)));
        bundleProcessingThreadCounter.reset();
        bundleProcessingThreadCounter.inc(7L);
        bundleProcessingThreadCounter.updateIntermediateMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Counter(7L)));
        hashMap.clear();
        bundleProcessingThreadCounter.updateIntermediateMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.emptyMap());
        bundleProcessingThreadCounter.inc();
        bundleProcessingThreadCounter.updateIntermediateMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Counter(8L)));
        hashMap.clear();
        bundleProcessingThreadCounter.dec(4L);
        bundleProcessingThreadCounter.updateIntermediateMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Counter(4L)));
        bundleProcessingThreadCounter.dec();
        bundleProcessingThreadCounter.updateFinalMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Counter(3L)));
    }

    @Test
    public void testAccurateBundleDistributionWithMutations() throws Exception {
        HashMap hashMap = new HashMap();
        Metrics.BundleDistribution bundleProcessingThreadDistribution = Metrics.bundleProcessingThreadDistribution(TEST_ID, TEST_NAME);
        bundleProcessingThreadDistribution.update(7L);
        bundleProcessingThreadDistribution.updateIntermediateMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Distribution(DistributionData.singleton(7L))));
        hashMap.clear();
        bundleProcessingThreadDistribution.updateIntermediateMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.emptyMap());
        bundleProcessingThreadDistribution.update(5L, 2L, 2L, 3L);
        bundleProcessingThreadDistribution.updateIntermediateMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Distribution(DistributionData.create(12L, 3L, 2L, 7L))));
        hashMap.clear();
        bundleProcessingThreadDistribution.reset();
        bundleProcessingThreadDistribution.update(7L);
        bundleProcessingThreadDistribution.updateIntermediateMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Distribution(DistributionData.singleton(7L))));
        hashMap.clear();
        bundleProcessingThreadDistribution.updateIntermediateMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.emptyMap());
        bundleProcessingThreadDistribution.update(5L, 2L, 2L, 3L);
        bundleProcessingThreadDistribution.updateIntermediateMonitoringData(hashMap);
        Assert.assertEquals(hashMap, Collections.singletonMap(TEST_ID, MonitoringInfoEncodings.encodeInt64Distribution(DistributionData.create(12L, 3L, 2L, 7L))));
    }

    @Test
    public void testAccurateBundleCounterUsingMultipleThreads() throws Exception {
        Metrics.BundleCounter bundleProcessingThreadCounter = Metrics.bundleProcessingThreadCounter(TEST_ID, TEST_NAME);
        List<ByteString> testAccurateBundleMetricUsingMultipleThreads = testAccurateBundleMetricUsingMultipleThreads(bundleProcessingThreadCounter, () -> {
            bundleProcessingThreadCounter.inc();
        });
        Assert.assertTrue(testAccurateBundleMetricUsingMultipleThreads.size() >= 10);
        ArrayList arrayList = new ArrayList();
        Iterator<ByteString> it = testAccurateBundleMetricUsingMultipleThreads.iterator();
        while (it.hasNext()) {
            arrayList.add(Long.valueOf(MonitoringInfoEncodings.decodeInt64Counter(it.next())));
        }
        Collections.sort(arrayList);
        ArrayList arrayList2 = new ArrayList();
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            arrayList2.add(MonitoringInfoEncodings.encodeInt64Counter(((Long) it2.next()).longValue()));
        }
        MatcherAssert.assertThat(testAccurateBundleMetricUsingMultipleThreads, (Matcher<? super List<ByteString>>) Matchers.contains(arrayList2.toArray()));
    }

    @Test
    public void testAccurateBundleDistributionUsingMultipleThreads() throws Exception {
        Metrics.BundleDistribution bundleProcessingThreadDistribution = Metrics.bundleProcessingThreadDistribution(TEST_ID, TEST_NAME);
        List<ByteString> testAccurateBundleMetricUsingMultipleThreads = testAccurateBundleMetricUsingMultipleThreads(bundleProcessingThreadDistribution, () -> {
            bundleProcessingThreadDistribution.update(1L);
        });
        Assert.assertTrue(testAccurateBundleMetricUsingMultipleThreads.size() >= 10);
        ArrayList arrayList = new ArrayList();
        Iterator<ByteString> it = testAccurateBundleMetricUsingMultipleThreads.iterator();
        while (it.hasNext()) {
            arrayList.add(MonitoringInfoEncodings.decodeInt64Distribution(it.next()));
        }
        Collections.sort(arrayList, Comparator.comparingLong((v0) -> {
            return v0.count();
        }));
        ArrayList arrayList2 = new ArrayList();
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            arrayList2.add(MonitoringInfoEncodings.encodeInt64Distribution((DistributionData) it2.next()));
        }
        MatcherAssert.assertThat(testAccurateBundleMetricUsingMultipleThreads, (Matcher<? super List<ByteString>>) Matchers.contains(arrayList2.toArray()));
    }

    private List<ByteString> testAccurateBundleMetricUsingMultipleThreads(BundleProgressReporter bundleProgressReporter, MetricMutator metricMutator) throws Exception {
        ReentrantLock reentrantLock = new ReentrantLock();
        ArrayList arrayList = new ArrayList();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        Future submit = this.executor.submit(() -> {
            while (!atomicBoolean2.get()) {
                HashMap hashMap = new HashMap();
                reentrantLock.lock();
                try {
                    bundleProgressReporter.updateIntermediateMonitoringData(hashMap);
                    if (!hashMap.isEmpty()) {
                        arrayList.add((ByteString) hashMap.get(TEST_ID));
                        if (arrayList.size() >= 10) {
                            atomicBoolean.set(true);
                        }
                    }
                } finally {
                    reentrantLock.unlock();
                }
            }
            return null;
        });
        Future submit2 = this.executor.submit(() -> {
            HashMap hashMap = new HashMap();
            while (!atomicBoolean.get()) {
                metricMutator.mutate();
            }
            reentrantLock.lock();
            try {
                bundleProgressReporter.updateFinalMonitoringData(hashMap);
                if (!hashMap.isEmpty()) {
                    arrayList.add((ByteString) hashMap.get(TEST_ID));
                }
                atomicBoolean2.set(true);
                return null;
            } finally {
                reentrantLock.unlock();
            }
        });
        submit.get();
        submit2.get();
        return arrayList;
    }
}
