package org.apache.beam.fn.harness.control;

import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.DelegatingHistogram;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/control/ExecutionStateSamplerTest.class */
public class ExecutionStateSamplerTest {
    private static final Counter TEST_USER_COUNTER = Metrics.counter("foo", "counter");
    private static final Distribution TEST_USER_DISTRIBUTION = Metrics.distribution("foo", "distribution");
    private static final Gauge TEST_USER_GAUGE = Metrics.gauge("foo", "gauge");
    private static final Histogram TEST_USER_HISTOGRAM = new DelegatingHistogram(MetricName.named("foo", "histogram"), HistogramData.LinearBuckets.of(0.0d, 100.0d, 1), false);

    @Rule
    public ExpectedLogs expectedLogs = ExpectedLogs.none(ExecutionStateSampler.class);

    @After
    public void tearDown() {
        MetricsEnvironment.setCurrentContainer((MetricsContainer) null);
    }

    @Test
    public void testSamplingProducesCorrectFinalResults() throws Exception {
        DateTimeUtils.MillisProvider millisProvider = (DateTimeUtils.MillisProvider) Mockito.mock(DateTimeUtils.MillisProvider.class);
        ExecutionStateSampler executionStateSampler = new ExecutionStateSampler(PipelineOptionsFactory.fromArgs(new String[]{"--experiments=state_sampling_period_millis=10"}).create(), millisProvider);
        ExecutionStateSampler.ExecutionStateTracker create = executionStateSampler.create();
        ExecutionStateSampler.ExecutionState create2 = create.create("shortId1", "ptransformId1", "ptransformIdName1", ExecutionStateTracker.PROCESS_STATE_NAME);
        ExecutionStateSampler.ExecutionStateTracker create3 = executionStateSampler.create();
        ExecutionStateSampler.ExecutionState create4 = create3.create("shortId2", "ptransformId2", "ptransformIdName2", ExecutionStateTracker.PROCESS_STATE_NAME);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final CountDownLatch countDownLatch3 = new CountDownLatch(1);
        final CountDownLatch countDownLatch4 = new CountDownLatch(1);
        final CountDownLatch countDownLatch5 = new CountDownLatch(1);
        final CountDownLatch countDownLatch6 = new CountDownLatch(1);
        final Thread currentThread = Thread.currentThread();
        Mockito.when(Long.valueOf(millisProvider.getMillis())).thenAnswer(new Answer<Long>() { // from class: org.apache.beam.fn.harness.control.ExecutionStateSamplerTest.1
            private long currentTime;

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.mockito.stubbing.Answer
            public Long answer(InvocationOnMock invocationOnMock) throws Throwable {
                if (Thread.currentThread().equals(currentThread)) {
                    return 1L;
                }
                if (this.currentTime < 1000) {
                    countDownLatch.await();
                    this.currentTime += 100;
                } else if (this.currentTime < 1500) {
                    countDownLatch4.countDown();
                    countDownLatch2.await();
                    this.currentTime += 100;
                } else if (this.currentTime == 1500) {
                    countDownLatch5.countDown();
                    countDownLatch3.await();
                    this.currentTime = 1600L;
                } else if (this.currentTime == 1600) {
                    countDownLatch6.countDown();
                }
                return Long.valueOf(this.currentTime);
            }
        });
        Assert.assertNull(create.getCurrentThreadsPTransformId());
        Assert.assertNull(create3.getCurrentThreadsPTransformId());
        Assert.assertNull(create.getStatus());
        Assert.assertNull(create3.getStatus());
        create.start("bundleId1");
        create3.start("bundleId2");
        create2.activate();
        create4.activate();
        Assert.assertEquals("ptransformId1", create.getCurrentThreadsPTransformId());
        Assert.assertEquals("ptransformId2", create3.getCurrentThreadsPTransformId());
        ExecutionStateSampler.ExecutionStateTrackerStatus status = create.getStatus();
        ExecutionStateSampler.ExecutionStateTrackerStatus status2 = create3.getStatus();
        Assert.assertEquals("ptransformId1", status.getPTransformId());
        Assert.assertEquals("ptransformId2", status2.getPTransformId());
        Assert.assertEquals("ptransformIdName1", status.getPTransformUniqueName());
        Assert.assertEquals("ptransformIdName2", status2.getPTransformUniqueName());
        Assert.assertEquals(Thread.currentThread(), status.getTrackedThread());
        Assert.assertEquals(Thread.currentThread(), status2.getTrackedThread());
        MatcherAssert.assertThat(Long.valueOf(status.getLastTransitionTimeMillis()), (Matcher<? super Long>) Matchers.equalTo(1L));
        MatcherAssert.assertThat(Long.valueOf(status2.getLastTransitionTimeMillis()), (Matcher<? super Long>) Matchers.equalTo(1L));
        countDownLatch.countDown();
        countDownLatch4.await();
        Assert.assertEquals("ptransformId1", create.getCurrentThreadsPTransformId());
        Assert.assertEquals("ptransformId2", create3.getCurrentThreadsPTransformId());
        ExecutionStateSampler.ExecutionStateTrackerStatus status3 = create.getStatus();
        ExecutionStateSampler.ExecutionStateTrackerStatus status4 = create3.getStatus();
        Assert.assertEquals("ptransformId1", status3.getPTransformId());
        Assert.assertEquals("ptransformId2", status4.getPTransformId());
        Assert.assertEquals("ptransformIdName1", status3.getPTransformUniqueName());
        Assert.assertEquals("ptransformIdName2", status4.getPTransformUniqueName());
        Assert.assertEquals(Thread.currentThread(), status3.getTrackedThread());
        Assert.assertEquals(Thread.currentThread(), status4.getTrackedThread());
        MatcherAssert.assertThat(Long.valueOf(status3.getLastTransitionTimeMillis()), (Matcher<? super Long>) Matchers.greaterThan(Long.valueOf(status.getLastTransitionTimeMillis())));
        MatcherAssert.assertThat(Long.valueOf(status4.getLastTransitionTimeMillis()), (Matcher<? super Long>) Matchers.greaterThan(Long.valueOf(status2.getLastTransitionTimeMillis())));
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        create.updateIntermediateMonitoringData(hashMap);
        create3.updateIntermediateMonitoringData(hashMap2);
        MatcherAssert.assertThat(Long.valueOf(MonitoringInfoEncodings.decodeInt64Counter((ByteString) hashMap.get("shortId1"))), Matchers.anyOf(Matchers.equalTo(900L), Matchers.equalTo(1000L)));
        MatcherAssert.assertThat(Long.valueOf(MonitoringInfoEncodings.decodeInt64Counter((ByteString) hashMap2.get("shortId2"))), Matchers.anyOf(Matchers.equalTo(900L), Matchers.equalTo(1000L)));
        countDownLatch2.countDown();
        countDownLatch5.await();
        create2.deactivate();
        create4.deactivate();
        countDownLatch3.countDown();
        countDownLatch6.await();
        Assert.assertNull(create.getCurrentThreadsPTransformId());
        Assert.assertNull(create3.getCurrentThreadsPTransformId());
        ExecutionStateSampler.ExecutionStateTrackerStatus status5 = create.getStatus();
        ExecutionStateSampler.ExecutionStateTrackerStatus status6 = create3.getStatus();
        Assert.assertNull(status5.getPTransformId());
        Assert.assertNull(status6.getPTransformId());
        Assert.assertNull(status5.getPTransformUniqueName());
        Assert.assertNull(status6.getPTransformUniqueName());
        Assert.assertEquals(Thread.currentThread(), status5.getTrackedThread());
        Assert.assertEquals(Thread.currentThread(), status6.getTrackedThread());
        MatcherAssert.assertThat(Long.valueOf(status5.getLastTransitionTimeMillis()), (Matcher<? super Long>) Matchers.greaterThan(Long.valueOf(status3.getLastTransitionTimeMillis())));
        MatcherAssert.assertThat(Long.valueOf(status6.getLastTransitionTimeMillis()), (Matcher<? super Long>) Matchers.greaterThan(Long.valueOf(status3.getLastTransitionTimeMillis())));
        HashMap hashMap3 = new HashMap();
        HashMap hashMap4 = new HashMap();
        create.updateFinalMonitoringData(hashMap3);
        create3.updateFinalMonitoringData(hashMap4);
        MatcherAssert.assertThat(Long.valueOf(MonitoringInfoEncodings.decodeInt64Counter((ByteString) hashMap3.get("shortId1"))), Matchers.anyOf(Matchers.equalTo(1400L), Matchers.equalTo(1500L)));
        MatcherAssert.assertThat(Long.valueOf(MonitoringInfoEncodings.decodeInt64Counter((ByteString) hashMap4.get("shortId2"))), Matchers.anyOf(Matchers.equalTo(1400L), Matchers.equalTo(1500L)));
        create.reset();
        create3.reset();
        Assert.assertNull(create.getCurrentThreadsPTransformId());
        Assert.assertNull(create3.getCurrentThreadsPTransformId());
        Assert.assertNull(create.getStatus());
        Assert.assertNull(create3.getStatus());
        executionStateSampler.stop();
        this.expectedLogs.verifyNotLogged("Operation ongoing");
    }

    @Test
    public void testSamplingDoesntReportDuplicateFinalResults() throws Exception {
        DateTimeUtils.MillisProvider millisProvider = (DateTimeUtils.MillisProvider) Mockito.mock(DateTimeUtils.MillisProvider.class);
        ExecutionStateSampler executionStateSampler = new ExecutionStateSampler(PipelineOptionsFactory.fromArgs(new String[]{"--experiments=state_sampling_period_millis=10"}).create(), millisProvider);
        ExecutionStateSampler.ExecutionStateTracker create = executionStateSampler.create();
        ExecutionStateSampler.ExecutionState create2 = create.create("shortId1", "ptransformId1", "ptransformIdName1", ExecutionStateTracker.PROCESS_STATE_NAME);
        ExecutionStateSampler.ExecutionStateTracker create3 = executionStateSampler.create();
        ExecutionStateSampler.ExecutionState create4 = create3.create("shortId2", "ptransformId2", "ptransformIdName2", ExecutionStateTracker.PROCESS_STATE_NAME);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final Thread currentThread = Thread.currentThread();
        Mockito.when(Long.valueOf(millisProvider.getMillis())).thenAnswer(new Answer<Long>() { // from class: org.apache.beam.fn.harness.control.ExecutionStateSamplerTest.2
            private long currentTime;

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.mockito.stubbing.Answer
            public Long answer(InvocationOnMock invocationOnMock) throws Throwable {
                if (Thread.currentThread().equals(currentThread)) {
                    return 0L;
                }
                countDownLatch.await();
                if (this.currentTime < 1000) {
                    this.currentTime += 100;
                } else {
                    countDownLatch2.countDown();
                }
                return Long.valueOf(this.currentTime);
            }
        });
        create.start("bundleId1");
        create3.start("bundleId2");
        create2.activate();
        create4.activate();
        countDownLatch.countDown();
        countDownLatch2.await();
        create2.deactivate();
        create4.deactivate();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        create.updateIntermediateMonitoringData(hashMap);
        create3.updateIntermediateMonitoringData(hashMap2);
        MatcherAssert.assertThat(Long.valueOf(MonitoringInfoEncodings.decodeInt64Counter((ByteString) hashMap.get("shortId1"))), Matchers.anyOf(Matchers.equalTo(900L), Matchers.equalTo(1000L)));
        MatcherAssert.assertThat(Long.valueOf(MonitoringInfoEncodings.decodeInt64Counter((ByteString) hashMap2.get("shortId2"))), Matchers.anyOf(Matchers.equalTo(900L), Matchers.equalTo(1000L)));
        create2.deactivate();
        create4.deactivate();
        HashMap hashMap3 = new HashMap();
        HashMap hashMap4 = new HashMap();
        create.updateFinalMonitoringData(hashMap3);
        create3.updateFinalMonitoringData(hashMap4);
        Assert.assertTrue(hashMap3.isEmpty());
        Assert.assertTrue(hashMap4.isEmpty());
        create.reset();
        create3.reset();
        executionStateSampler.stop();
        this.expectedLogs.verifyNotLogged("Operation ongoing");
    }

    @Test
    public void testCountersReturnedAreBasedUponCurrentExecutionState() throws Exception {
        ExecutionStateSampler.ExecutionStateTracker create = new ExecutionStateSampler(PipelineOptionsFactory.fromArgs(new String[]{"--experiments=state_sampling_period_millis=10"}).create(), (DateTimeUtils.MillisProvider) Mockito.mock(DateTimeUtils.MillisProvider.class)).create();
        MetricsEnvironment.setCurrentContainer(create.getMetricsContainer());
        ExecutionStateSampler.ExecutionState create2 = create.create("shortId", "ptransformId", "uniqueName", "state");
        create2.activate();
        TEST_USER_COUNTER.inc();
        TEST_USER_DISTRIBUTION.update(2L);
        TEST_USER_GAUGE.set(3L);
        TEST_USER_HISTOGRAM.update(4.0d);
        create2.deactivate();
        TEST_USER_COUNTER.inc(11L);
        TEST_USER_DISTRIBUTION.update(12L);
        TEST_USER_GAUGE.set(13L);
        TEST_USER_HISTOGRAM.update(14.0d);
        TEST_USER_HISTOGRAM.update(14.0d);
        Assert.assertEquals(1L, create.getMetricsContainerRegistry().getContainer("ptransformId").m3768getCounter(TEST_USER_COUNTER.getName()).getCumulative().longValue());
        Assert.assertEquals(2L, create.getMetricsContainerRegistry().getContainer("ptransformId").m3767getDistribution(TEST_USER_DISTRIBUTION.getName()).getCumulative().sum());
        Assert.assertEquals(3L, create.getMetricsContainerRegistry().getContainer("ptransformId").m3766getGauge(TEST_USER_GAUGE.getName()).getCumulative().value());
        Assert.assertEquals(1L, create.getMetricsContainerRegistry().getContainer("ptransformId").m3765getHistogram(TEST_USER_HISTOGRAM.getName(), (HistogramData.BucketType) HistogramData.LinearBuckets.of(0.0d, 100.0d, 1)).getCumulative().getCount(0));
        Assert.assertEquals(11L, create.getMetricsContainerRegistry().getUnboundContainer().m3768getCounter(TEST_USER_COUNTER.getName()).getCumulative().longValue());
        Assert.assertEquals(12L, create.getMetricsContainerRegistry().getUnboundContainer().m3767getDistribution(TEST_USER_DISTRIBUTION.getName()).getCumulative().sum());
        Assert.assertEquals(13L, create.getMetricsContainerRegistry().getUnboundContainer().m3766getGauge(TEST_USER_GAUGE.getName()).getCumulative().value());
        Assert.assertEquals(2L, create.getMetricsContainerRegistry().getUnboundContainer().m3765getHistogram(TEST_USER_HISTOGRAM.getName(), (HistogramData.BucketType) HistogramData.LinearBuckets.of(0.0d, 100.0d, 1)).getCumulative().getCount(0));
    }

    @Test
    public void testTrackerReuse() throws Exception {
        DateTimeUtils.MillisProvider millisProvider = (DateTimeUtils.MillisProvider) Mockito.mock(DateTimeUtils.MillisProvider.class);
        ExecutionStateSampler.ExecutionStateTracker create = new ExecutionStateSampler(PipelineOptionsFactory.fromArgs(new String[]{"--experiments=state_sampling_period_millis=10"}).create(), millisProvider).create();
        MetricsEnvironment.setCurrentContainer(create.getMetricsContainer());
        ExecutionStateSampler.ExecutionState create2 = create.create("shortId", "ptransformId", "ptransformIdName", ExecutionStateTracker.PROCESS_STATE_NAME);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final CountDownLatch countDownLatch3 = new CountDownLatch(1);
        final CountDownLatch countDownLatch4 = new CountDownLatch(1);
        final Thread currentThread = Thread.currentThread();
        Mockito.when(Long.valueOf(millisProvider.getMillis())).thenAnswer(new Answer<Long>() { // from class: org.apache.beam.fn.harness.control.ExecutionStateSamplerTest.3
            private long currentTime;

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.mockito.stubbing.Answer
            public Long answer(InvocationOnMock invocationOnMock) throws Throwable {
                if (Thread.currentThread().equals(currentThread)) {
                    return 0L;
                }
                if (this.currentTime < 1000) {
                    countDownLatch.await();
                    this.currentTime += 100;
                } else if (this.currentTime < 1500) {
                    countDownLatch3.countDown();
                    countDownLatch2.await();
                    this.currentTime += 100;
                } else {
                    countDownLatch4.countDown();
                }
                return Long.valueOf(this.currentTime);
            }
        });
        create.start("bundleId1");
        create2.activate();
        countDownLatch.countDown();
        countDownLatch3.await();
        TEST_USER_COUNTER.inc();
        create2.deactivate();
        HashMap hashMap = new HashMap();
        create.updateFinalMonitoringData(hashMap);
        MatcherAssert.assertThat(Long.valueOf(MonitoringInfoEncodings.decodeInt64Counter((ByteString) hashMap.get("shortId"))), Matchers.anyOf(Matchers.equalTo(900L), Matchers.equalTo(1000L)));
        Assert.assertEquals(1L, create.getMetricsContainerRegistry().getContainer("ptransformId").m3768getCounter(TEST_USER_COUNTER.getName()).getCumulative().longValue());
        create.reset();
        create.start("bundleId2");
        create2.activate();
        countDownLatch2.countDown();
        countDownLatch4.await();
        TEST_USER_COUNTER.inc();
        create2.deactivate();
        HashMap hashMap2 = new HashMap();
        create.updateFinalMonitoringData(hashMap2);
        MatcherAssert.assertThat(Long.valueOf(MonitoringInfoEncodings.decodeInt64Counter((ByteString) hashMap2.get("shortId"))), Matchers.anyOf(Matchers.equalTo(400L), Matchers.equalTo(500L)));
        Assert.assertEquals(1L, create.getMetricsContainerRegistry().getContainer("ptransformId").m3768getCounter(TEST_USER_COUNTER.getName()).getCumulative().longValue());
        create.reset();
        this.expectedLogs.verifyNotLogged("Operation ongoing");
    }

    @Test
    public void testLullDetectionOccursInActiveBundle() throws Exception {
        DateTimeUtils.MillisProvider millisProvider = (DateTimeUtils.MillisProvider) Mockito.mock(DateTimeUtils.MillisProvider.class);
        ExecutionStateSampler executionStateSampler = new ExecutionStateSampler(PipelineOptionsFactory.fromArgs(new String[]{"--experiments=state_sampling_period_millis=10"}).create(), millisProvider);
        ExecutionStateSampler.ExecutionStateTracker create = executionStateSampler.create();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(10);
        final Thread currentThread = Thread.currentThread();
        Mockito.when(Long.valueOf(millisProvider.getMillis())).thenAnswer(new Answer<Long>() { // from class: org.apache.beam.fn.harness.control.ExecutionStateSamplerTest.4
            private long currentTime;

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.mockito.stubbing.Answer
            public Long answer(InvocationOnMock invocationOnMock) throws Throwable {
                if (Thread.currentThread().equals(currentThread)) {
                    return 0L;
                }
                countDownLatch.await();
                countDownLatch2.countDown();
                this.currentTime += Duration.standardMinutes(1L).getMillis();
                return Long.valueOf(this.currentTime);
            }
        });
        create.start("bundleId");
        countDownLatch.countDown();
        countDownLatch2.await();
        create.reset();
        executionStateSampler.stop();
        this.expectedLogs.verifyWarn("Operation ongoing in bundle bundleId for at least");
    }

    @Test
    public void testLullDetectionOccursInActiveState() throws Exception {
        DateTimeUtils.MillisProvider millisProvider = (DateTimeUtils.MillisProvider) Mockito.mock(DateTimeUtils.MillisProvider.class);
        ExecutionStateSampler executionStateSampler = new ExecutionStateSampler(PipelineOptionsFactory.fromArgs(new String[]{"--experiments=state_sampling_period_millis=10"}).create(), millisProvider);
        ExecutionStateSampler.ExecutionStateTracker create = executionStateSampler.create();
        ExecutionStateSampler.ExecutionState create2 = create.create("shortId", "ptransformId", "ptransformIdName", ExecutionStateTracker.PROCESS_STATE_NAME);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(10);
        final Thread currentThread = Thread.currentThread();
        Mockito.when(Long.valueOf(millisProvider.getMillis())).thenAnswer(new Answer<Long>() { // from class: org.apache.beam.fn.harness.control.ExecutionStateSamplerTest.5
            private long currentTime;

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.mockito.stubbing.Answer
            public Long answer(InvocationOnMock invocationOnMock) throws Throwable {
                if (Thread.currentThread().equals(currentThread)) {
                    return 0L;
                }
                countDownLatch.await();
                countDownLatch2.countDown();
                this.currentTime += Duration.standardMinutes(1L).getMillis();
                return Long.valueOf(this.currentTime);
            }
        });
        create.start("bundleId");
        create2.activate();
        countDownLatch.countDown();
        countDownLatch2.await();
        create2.deactivate();
        create.reset();
        executionStateSampler.stop();
        this.expectedLogs.verifyWarn("Operation ongoing in bundle bundleId for PTransform");
    }
}
