package org.apache.beam.fn.harness.data;

import org.apache.beam.fn.harness.FnApiDoFnRunnerTest;
import org.apache.beam.fn.harness.HandlesSplits;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@PrepareForTest({MetricsEnvironment.class})
@RunWith(PowerMockRunner.class)
/* loaded from: input_file:org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest.class */
public class PCollectionConsumerRegistryTest {

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    /* loaded from: input_file:org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest$SplittingReceiver.class */
    private static abstract class SplittingReceiver<T> implements FnDataReceiver<T>, HandlesSplits {
        private SplittingReceiver() {
        }
    }

    @Test
    public void singleConsumer() throws Exception {
        MetricsContainerStepMap metricsContainerStepMap = new MetricsContainerStepMap();
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(metricsContainerStepMap, (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        pCollectionConsumerRegistry.register("pCollectionA", "pTransformIdA", fnDataReceiver);
        FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer("pCollectionA");
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow("elem");
        for (int i = 0; i < 20; i++) {
            multiplexingConsumer.accept(valueInGlobalWindow);
        }
        ((FnDataReceiver) Mockito.verify(fnDataReceiver, Mockito.times(20))).accept(valueInGlobalWindow);
        Assert.assertThat(pCollectionConsumerRegistry.keySet(), Matchers.contains("pCollectionA"));
        SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder = new SimpleMonitoringInfoBuilder();
        simpleMonitoringInfoBuilder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
        simpleMonitoringInfoBuilder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, "pCollectionA");
        simpleMonitoringInfoBuilder.setInt64Value(20);
        Assert.assertEquals(simpleMonitoringInfoBuilder.build(), SimpleMonitoringInfoBuilder.copyAndClearTimestamp((MetricsApi.MonitoringInfo) Iterables.find(metricsContainerStepMap.getMonitoringInfos(), monitoringInfo -> {
            return monitoringInfo.containsLabels(MonitoringInfoConstants.Labels.PCOLLECTION);
        })));
    }

    @Test
    public void singleConsumerException() throws Exception {
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        pCollectionConsumerRegistry.register("pCollectionA", FnApiDoFnRunnerTest.TEST_TRANSFORM_ID, fnDataReceiver);
        FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer("pCollectionA");
        ((FnDataReceiver) Mockito.doThrow(new Exception("testException")).when(fnDataReceiver)).accept((WindowedValue) Mockito.any());
        this.expectedException.expectMessage("testException");
        this.expectedException.expect(Exception.class);
        multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow("elem"));
    }

    @Test
    public void multipleConsumersSamePCollection() throws Exception {
        MetricsContainerStepMap metricsContainerStepMap = new MetricsContainerStepMap();
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(metricsContainerStepMap, (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        FnDataReceiver fnDataReceiver2 = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        pCollectionConsumerRegistry.register("pCollectionA", "pTransformIdA", fnDataReceiver);
        pCollectionConsumerRegistry.register("pCollectionA", "pTransformIdB", fnDataReceiver2);
        FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer("pCollectionA");
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow("elem");
        for (int i = 0; i < 20; i++) {
            multiplexingConsumer.accept(valueInGlobalWindow);
        }
        ((FnDataReceiver) Mockito.verify(fnDataReceiver, Mockito.times(20))).accept(valueInGlobalWindow);
        ((FnDataReceiver) Mockito.verify(fnDataReceiver2, Mockito.times(20))).accept(valueInGlobalWindow);
        Assert.assertThat(pCollectionConsumerRegistry.keySet(), Matchers.contains("pCollectionA"));
        SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder = new SimpleMonitoringInfoBuilder();
        simpleMonitoringInfoBuilder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
        simpleMonitoringInfoBuilder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, "pCollectionA");
        simpleMonitoringInfoBuilder.setInt64Value(20);
        Assert.assertEquals(simpleMonitoringInfoBuilder.build(), SimpleMonitoringInfoBuilder.copyAndClearTimestamp((MetricsApi.MonitoringInfo) Iterables.find(metricsContainerStepMap.getMonitoringInfos(), monitoringInfo -> {
            return monitoringInfo.containsLabels(MonitoringInfoConstants.Labels.PCOLLECTION);
        })));
    }

    @Test
    public void multipleConsumersSamePCollectionException() throws Exception {
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        FnDataReceiver fnDataReceiver2 = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        pCollectionConsumerRegistry.register("pCollectionA", FnApiDoFnRunnerTest.TEST_TRANSFORM_ID, fnDataReceiver);
        pCollectionConsumerRegistry.register("pCollectionA", FnApiDoFnRunnerTest.TEST_TRANSFORM_ID, fnDataReceiver2);
        FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer("pCollectionA");
        ((FnDataReceiver) Mockito.doThrow(new Exception("testException")).when(fnDataReceiver2)).accept((WindowedValue) Mockito.any());
        this.expectedException.expectMessage("testException");
        this.expectedException.expect(Exception.class);
        multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow("elem"));
    }

    @Test
    public void throwsOnRegisteringAfterMultiplexingConsumerWasInitialized() throws Exception {
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        FnDataReceiver fnDataReceiver2 = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        pCollectionConsumerRegistry.register("pCollectionA", FnApiDoFnRunnerTest.TEST_TRANSFORM_ID, fnDataReceiver);
        pCollectionConsumerRegistry.getMultiplexingConsumer("pCollectionA");
        this.expectedException.expect(RuntimeException.class);
        this.expectedException.expectMessage("cannot be register()-d after");
        pCollectionConsumerRegistry.register("pCollectionA", FnApiDoFnRunnerTest.TEST_TRANSFORM_ID, fnDataReceiver2);
    }

    @Test
    public void testScopedMetricContainerInvokedUponAcceptingElement() throws Exception {
        PowerMockito.mockStatic((Class<?>) MetricsEnvironment.class, (Class<?>[]) new Class[0]);
        MetricsContainerStepMap metricsContainerStepMap = new MetricsContainerStepMap();
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(metricsContainerStepMap, (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        FnDataReceiver fnDataReceiver2 = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        pCollectionConsumerRegistry.register("pCollectionA", "pTransformA", fnDataReceiver);
        pCollectionConsumerRegistry.register("pCollectionA", "pTransformB", fnDataReceiver2);
        pCollectionConsumerRegistry.getMultiplexingConsumer("pCollectionA").accept(WindowedValue.valueInGlobalWindow("elem"));
        PowerMockito.verifyStatic(MetricsEnvironment.class, Mockito.times(1));
        MetricsEnvironment.scopedMetricsContainer(metricsContainerStepMap.getContainer("pTransformA"));
        PowerMockito.verifyStatic(MetricsEnvironment.class, Mockito.times(1));
        MetricsEnvironment.scopedMetricsContainer(metricsContainerStepMap.getContainer("pTransformB"));
    }

    @Test
    public void testScopedMetricContainerInvokedUponAccept() throws Exception {
        PowerMockito.mockStatic((Class<?>) MetricsEnvironment.class, Mockito.withSettings().verboseLogging());
        MetricsContainerStepMap metricsContainerStepMap = new MetricsContainerStepMap();
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(metricsContainerStepMap, (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Mockito.mock(FnDataReceiver.class, Mockito.withSettings().verboseLogging());
        pCollectionConsumerRegistry.register("pCollectionA", "pTransformIdA", fnDataReceiver);
        FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer("pCollectionA");
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow("elem");
        multiplexingConsumer.accept(valueInGlobalWindow);
        ((FnDataReceiver) Mockito.verify(fnDataReceiver, Mockito.times(1))).accept(valueInGlobalWindow);
        PowerMockito.verifyStatic(MetricsEnvironment.class, Mockito.times(1));
        MetricsEnvironment.scopedMetricsContainer(metricsContainerStepMap.getUnboundContainer());
    }

    @Test
    public void testHandlesSplitsPassedToOriginalConsumer() throws Exception {
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
        SplittingReceiver splittingReceiver = (SplittingReceiver) Mockito.mock(SplittingReceiver.class);
        pCollectionConsumerRegistry.register("pCollectionA", "pTransformIdA", splittingReceiver);
        HandlesSplits multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer("pCollectionA");
        Assert.assertTrue(multiplexingConsumer instanceof HandlesSplits);
        multiplexingConsumer.getProgress();
        ((SplittingReceiver) Mockito.verify(splittingReceiver)).getProgress();
        multiplexingConsumer.trySplit(0.3d);
        ((SplittingReceiver) Mockito.verify(splittingReceiver)).trySplit(0.3d);
    }
}
