package org.apache.beam.fn.harness.data;

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/data/PTransformFunctionRegistryTest.class */
public class PTransformFunctionRegistryTest {
    private static final Counter TEST_USER_COUNTER = Metrics.counter("foo", "bar");
    private ExecutionStateSampler sampler;

    @Before
    public void setUp() {
        this.sampler = new ExecutionStateSampler(PipelineOptionsFactory.create(), System::currentTimeMillis);
    }

    @After
    public void tearDown() {
        MetricsEnvironment.setCurrentContainer((MetricsContainer) null);
        this.sampler.stop();
    }

    @Test
    public void testStateTrackerRecordsStateTransitions() throws Exception {
        final ExecutionStateSampler.ExecutionStateTracker create = this.sampler.create();
        MetricsEnvironment.setCurrentContainer(create.getMetricsContainer());
        PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry(new ShortIdMap(), create, MonitoringInfoConstants.Urns.START_BUNDLE_MSECS);
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        ThrowingRunnable throwingRunnable = new ThrowingRunnable() { // from class: org.apache.beam.fn.harness.data.PTransformFunctionRegistryTest.1
            public void run() throws Exception {
                atomicBoolean.set(true);
                ExecutionStateSampler.ExecutionStateTrackerStatus status = create.getStatus();
                Assert.assertNotNull(status);
                Assert.assertEquals(Thread.currentThread(), status.getTrackedThread());
                Assert.assertEquals("pTransformA", status.getPTransformId());
            }
        };
        ThrowingRunnable throwingRunnable2 = new ThrowingRunnable() { // from class: org.apache.beam.fn.harness.data.PTransformFunctionRegistryTest.2
            public void run() throws Exception {
                atomicBoolean2.set(true);
                ExecutionStateSampler.ExecutionStateTrackerStatus status = create.getStatus();
                Assert.assertNotNull(status);
                Assert.assertEquals(Thread.currentThread(), status.getTrackedThread());
                Assert.assertEquals("pTransformB", status.getPTransformId());
            }
        };
        pTransformFunctionRegistry.register("pTransformA", "pTranformAName", throwingRunnable);
        pTransformFunctionRegistry.register("pTransformB", "pTranformBName", throwingRunnable2);
        create.start("testBundleId");
        Iterator it = pTransformFunctionRegistry.getFunctions().iterator();
        while (it.hasNext()) {
            ((ThrowingRunnable) it.next()).run();
        }
        create.reset();
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertTrue(atomicBoolean2.get());
    }

    @Test
    public void testMetricsUponRunningFunctions() throws Exception {
        ExecutionStateSampler.ExecutionStateTracker create = this.sampler.create();
        MetricsEnvironment.setCurrentContainer(create.getMetricsContainer());
        PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry(new ShortIdMap(), create, MonitoringInfoConstants.Urns.START_BUNDLE_MSECS);
        pTransformFunctionRegistry.register("pTransformA", "pTranformAName", () -> {
            TEST_USER_COUNTER.inc();
        });
        pTransformFunctionRegistry.register("pTransformB", "pTranformBName", () -> {
            TEST_USER_COUNTER.inc(2L);
        });
        create.start("testBundleId");
        Iterator it = pTransformFunctionRegistry.getFunctions().iterator();
        while (it.hasNext()) {
            ((ThrowingRunnable) it.next()).run();
        }
        TEST_USER_COUNTER.inc(3L);
        Assert.assertEquals(1L, create.getMetricsContainerRegistry().getContainer("pTransformA").m3768getCounter(TEST_USER_COUNTER.getName()).getCumulative().longValue());
        Assert.assertEquals(2L, create.getMetricsContainerRegistry().getContainer("pTransformB").m3768getCounter(TEST_USER_COUNTER.getName()).getCumulative().longValue());
        Assert.assertEquals(3L, create.getMetricsContainerRegistry().getUnboundContainer().m3768getCounter(TEST_USER_COUNTER.getName()).getCumulative().longValue());
        create.reset();
    }
}
