package org.apache.beam.fn.harness.data;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.fn.harness.FnApiDoFnRunnerTest;
import org.apache.beam.fn.harness.HandlesSplits;
import org.apache.beam.fn.harness.control.BundleProgressReporter;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.fn.harness.debug.DataSampler;
import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
import org.apache.beam.fn.harness.logging.BeamFnLoggingMDC;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterable;
import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterator;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.logging.log4j.util.ProcessIdUtil;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest.class */
public class PCollectionConsumerRegistryTest {
    private static final Counter TEST_USER_COUNTER = Metrics.counter("foo", "bar");

    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    private static final String P_COLLECTION_A = "pCollectionA";
    private static final String P_COLLECTION_B = "pCollectionB";
    private static final BeamFnApi.ProcessBundleDescriptor TEST_DESCRIPTOR;
    private ExecutionStateSampler sampler;

    /* loaded from: input_file:org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest$SplittingReceiver.class */
    private static abstract class SplittingReceiver<T> implements FnDataReceiver<T>, HandlesSplits {
        private SplittingReceiver() {
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest$TestElementByteSizeObservableIterable.class */
    private static class TestElementByteSizeObservableIterable<T> extends ElementByteSizeObservableIterable<T, ElementByteSizeObservableIterator<T>> {
        private List<T> elements;
        private long elementByteSize;

        public TestElementByteSizeObservableIterable(List<T> list, long j) {
            this.elements = list;
            this.elementByteSize = j;
        }

        protected ElementByteSizeObservableIterator createIterator() {
            return new ElementByteSizeObservableIterator() { // from class: org.apache.beam.fn.harness.data.PCollectionConsumerRegistryTest.TestElementByteSizeObservableIterable.1
                private int index = 0;

                public boolean hasNext() {
                    return this.index < TestElementByteSizeObservableIterable.this.elements.size();
                }

                public Object next() {
                    notifyValueReturned(TestElementByteSizeObservableIterable.this.elementByteSize);
                    List list = TestElementByteSizeObservableIterable.this.elements;
                    int i = this.index;
                    this.index = i + 1;
                    return list.get(i);
                }
            };
        }
    }

    @Before
    public void setUp() throws Exception {
        this.sampler = new ExecutionStateSampler(PipelineOptionsFactory.create(), System::currentTimeMillis);
    }

    @After
    public void tearDown() throws Exception {
        MetricsEnvironment.setCurrentContainer((MetricsContainer) null);
        this.sampler.stop();
    }

    @Test
    public void singleConsumer() throws Exception {
        ShortIdMap shortIdMap = new ShortIdMap();
        BundleProgressReporter.InMemory inMemory = new BundleProgressReporter.InMemory();
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(this.sampler.create(), shortIdMap, inMemory, TEST_DESCRIPTOR);
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        pCollectionConsumerRegistry.register(P_COLLECTION_A, "pTransformIdA", "pTransformIdAName", fnDataReceiver);
        FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(P_COLLECTION_A);
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow("elem");
        for (int i = 0; i < 10; i++) {
            multiplexingConsumer.accept(valueInGlobalWindow);
        }
        ((FnDataReceiver) Mockito.verify(fnDataReceiver, Mockito.times(10))).accept(valueInGlobalWindow);
        ArrayList arrayList = new ArrayList();
        SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder = new SimpleMonitoringInfoBuilder();
        simpleMonitoringInfoBuilder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
        simpleMonitoringInfoBuilder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, P_COLLECTION_A);
        simpleMonitoringInfoBuilder.setInt64SumValue(10);
        arrayList.add(simpleMonitoringInfoBuilder.build());
        long encodedElementByteSize = StringUtf8Coder.of().getEncodedElementByteSize("elem");
        SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder2 = new SimpleMonitoringInfoBuilder();
        simpleMonitoringInfoBuilder2.setUrn(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE);
        simpleMonitoringInfoBuilder2.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, P_COLLECTION_A);
        simpleMonitoringInfoBuilder2.setInt64DistributionValue(DistributionData.create(10 * encodedElementByteSize, 10, encodedElementByteSize, encodedElementByteSize));
        arrayList.add(simpleMonitoringInfoBuilder2.build());
        HashMap hashMap = new HashMap();
        inMemory.updateFinalMonitoringData(hashMap);
        MatcherAssert.assertThat(Iterables.filter(shortIdMap.toMonitoringInfo(hashMap), monitoringInfo -> {
            return monitoringInfo.containsLabels(MonitoringInfoConstants.Labels.PCOLLECTION);
        }), (Matcher<? super Iterable>) Matchers.containsInAnyOrder(arrayList.toArray()));
    }

    @Test
    public void singleConsumerException() throws Exception {
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(this.sampler.create(), new ShortIdMap(), new BundleProgressReporter.InMemory(), TEST_DESCRIPTOR);
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        pCollectionConsumerRegistry.register(P_COLLECTION_A, FnApiDoFnRunnerTest.ExecutionTest.TEST_TRANSFORM_ID, "pTransformIdName", fnDataReceiver);
        FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(P_COLLECTION_A);
        ((FnDataReceiver) Mockito.doThrow(new Exception("testException")).when(fnDataReceiver)).accept((WindowedValue) Mockito.any());
        this.expectedException.expectMessage("testException");
        this.expectedException.expect(Exception.class);
        multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow("elem"));
    }

    @Test
    public void noConsumers() throws Exception {
        ShortIdMap shortIdMap = new ShortIdMap();
        BundleProgressReporter.InMemory inMemory = new BundleProgressReporter.InMemory();
        FnDataReceiver multiplexingConsumer = new PCollectionConsumerRegistry(this.sampler.create(), shortIdMap, inMemory, TEST_DESCRIPTOR).getMultiplexingConsumer(P_COLLECTION_A);
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow("elem");
        for (int i = 0; i < 10; i++) {
            multiplexingConsumer.accept(valueInGlobalWindow);
        }
        ArrayList arrayList = new ArrayList();
        SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder = new SimpleMonitoringInfoBuilder();
        simpleMonitoringInfoBuilder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
        simpleMonitoringInfoBuilder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, P_COLLECTION_A);
        simpleMonitoringInfoBuilder.setInt64SumValue(10);
        arrayList.add(simpleMonitoringInfoBuilder.build());
        long encodedElementByteSize = StringUtf8Coder.of().getEncodedElementByteSize("elem");
        SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder2 = new SimpleMonitoringInfoBuilder();
        simpleMonitoringInfoBuilder2.setUrn(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE);
        simpleMonitoringInfoBuilder2.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, P_COLLECTION_A);
        simpleMonitoringInfoBuilder2.setInt64DistributionValue(DistributionData.create(10 * encodedElementByteSize, 10, encodedElementByteSize, encodedElementByteSize));
        arrayList.add(simpleMonitoringInfoBuilder2.build());
        HashMap hashMap = new HashMap();
        inMemory.updateFinalMonitoringData(hashMap);
        MatcherAssert.assertThat(Iterables.filter(shortIdMap.toMonitoringInfo(hashMap), monitoringInfo -> {
            return monitoringInfo.containsLabels(MonitoringInfoConstants.Labels.PCOLLECTION);
        }), (Matcher<? super Iterable>) Matchers.containsInAnyOrder(arrayList.toArray()));
    }

    @Test
    public void multipleConsumersSamePCollection() throws Exception {
        ShortIdMap shortIdMap = new ShortIdMap();
        BundleProgressReporter.InMemory inMemory = new BundleProgressReporter.InMemory();
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(this.sampler.create(), shortIdMap, inMemory, TEST_DESCRIPTOR);
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        FnDataReceiver fnDataReceiver2 = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        pCollectionConsumerRegistry.register(P_COLLECTION_A, "pTransformIdA", "pTransformIdAName", fnDataReceiver);
        pCollectionConsumerRegistry.register(P_COLLECTION_A, "pTransformIdB", "pTransformIdBName", fnDataReceiver2);
        FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(P_COLLECTION_A);
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow("elem");
        for (int i = 0; i < 10; i++) {
            multiplexingConsumer.accept(valueInGlobalWindow);
        }
        ((FnDataReceiver) Mockito.verify(fnDataReceiver, Mockito.times(10))).accept(valueInGlobalWindow);
        ((FnDataReceiver) Mockito.verify(fnDataReceiver2, Mockito.times(10))).accept(valueInGlobalWindow);
        ArrayList arrayList = new ArrayList();
        SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder = new SimpleMonitoringInfoBuilder();
        simpleMonitoringInfoBuilder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
        simpleMonitoringInfoBuilder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, P_COLLECTION_A);
        simpleMonitoringInfoBuilder.setInt64SumValue(10);
        arrayList.add(simpleMonitoringInfoBuilder.build());
        long encodedElementByteSize = StringUtf8Coder.of().getEncodedElementByteSize("elem");
        SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder2 = new SimpleMonitoringInfoBuilder();
        simpleMonitoringInfoBuilder2.setUrn(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE);
        simpleMonitoringInfoBuilder2.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, P_COLLECTION_A);
        simpleMonitoringInfoBuilder2.setInt64DistributionValue(DistributionData.create(10 * encodedElementByteSize, 10, encodedElementByteSize, encodedElementByteSize));
        arrayList.add(simpleMonitoringInfoBuilder2.build());
        HashMap hashMap = new HashMap();
        inMemory.updateFinalMonitoringData(hashMap);
        MatcherAssert.assertThat(Iterables.filter(shortIdMap.toMonitoringInfo(hashMap), monitoringInfo -> {
            return monitoringInfo.containsLabels(MonitoringInfoConstants.Labels.PCOLLECTION);
        }), (Matcher<? super Iterable>) Matchers.containsInAnyOrder(arrayList.toArray()));
    }

    @Test
    public void multipleConsumersSamePCollectionException() throws Exception {
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(this.sampler.create(), new ShortIdMap(), new BundleProgressReporter.InMemory(), TEST_DESCRIPTOR);
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        FnDataReceiver fnDataReceiver2 = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        pCollectionConsumerRegistry.register(P_COLLECTION_A, FnApiDoFnRunnerTest.ExecutionTest.TEST_TRANSFORM_ID, "pTransformIdName", fnDataReceiver);
        pCollectionConsumerRegistry.register(P_COLLECTION_A, FnApiDoFnRunnerTest.ExecutionTest.TEST_TRANSFORM_ID, "pTransformIdName", fnDataReceiver2);
        FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(P_COLLECTION_A);
        ((FnDataReceiver) Mockito.doThrow(new Exception("testException")).when(fnDataReceiver2)).accept((WindowedValue) Mockito.any());
        this.expectedException.expectMessage("testException");
        this.expectedException.expect(Exception.class);
        multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow("elem"));
    }

    @Test
    public void throwsOnRegisteringAfterMultiplexingConsumerWasInitialized() throws Exception {
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(this.sampler.create(), new ShortIdMap(), new BundleProgressReporter.InMemory(), TEST_DESCRIPTOR);
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        FnDataReceiver fnDataReceiver2 = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        pCollectionConsumerRegistry.register(P_COLLECTION_A, FnApiDoFnRunnerTest.ExecutionTest.TEST_TRANSFORM_ID, "pTransformIdName", fnDataReceiver);
        pCollectionConsumerRegistry.getMultiplexingConsumer(P_COLLECTION_A);
        this.expectedException.expect(RuntimeException.class);
        this.expectedException.expectMessage("cannot be register()-d after");
        pCollectionConsumerRegistry.register(P_COLLECTION_A, FnApiDoFnRunnerTest.ExecutionTest.TEST_TRANSFORM_ID, "pTransformIdName", fnDataReceiver2);
    }

    @Test
    public void testMetricContainerUpdatedUponAcceptingElement() throws Exception {
        ExecutionStateSampler.ExecutionStateTracker create = this.sampler.create();
        MetricsEnvironment.setCurrentContainer(create.getMetricsContainer());
        ShortIdMap shortIdMap = new ShortIdMap();
        BundleProgressReporter.InMemory inMemory = new BundleProgressReporter.InMemory();
        create.start("testBundle");
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(create, shortIdMap, inMemory, TEST_DESCRIPTOR);
        pCollectionConsumerRegistry.register(P_COLLECTION_A, "pTransformA", "pTransformAName", windowedValue -> {
            TEST_USER_COUNTER.inc();
        });
        pCollectionConsumerRegistry.register(P_COLLECTION_A, "pTransformB", "pTransformBName", windowedValue2 -> {
            TEST_USER_COUNTER.inc(2L);
        });
        pCollectionConsumerRegistry.getMultiplexingConsumer(P_COLLECTION_A).accept(WindowedValue.valueInGlobalWindow("elem"));
        TEST_USER_COUNTER.inc(3L);
        Assert.assertEquals(1L, create.getMetricsContainerRegistry().getContainer("pTransformA").m3818getCounter(TEST_USER_COUNTER.getName()).getCumulative().longValue());
        Assert.assertEquals(2L, create.getMetricsContainerRegistry().getContainer("pTransformB").m3818getCounter(TEST_USER_COUNTER.getName()).getCumulative().longValue());
        Assert.assertEquals(3L, create.getMetricsContainerRegistry().getUnboundContainer().m3818getCounter(TEST_USER_COUNTER.getName()).getCumulative().longValue());
    }

    @Test
    public void testHandlesSplitsPassedToOriginalConsumer() throws Exception {
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(this.sampler.create(), new ShortIdMap(), new BundleProgressReporter.InMemory(), TEST_DESCRIPTOR);
        SplittingReceiver splittingReceiver = (SplittingReceiver) Mockito.mock(SplittingReceiver.class);
        pCollectionConsumerRegistry.register(P_COLLECTION_A, "pTransformIdA", "pTransformIdAName", splittingReceiver);
        HandlesSplits multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(P_COLLECTION_A);
        Assert.assertTrue(multiplexingConsumer instanceof HandlesSplits);
        multiplexingConsumer.getProgress();
        ((SplittingReceiver) Mockito.verify(splittingReceiver)).getProgress();
        multiplexingConsumer.trySplit(0.3d);
        ((SplittingReceiver) Mockito.verify(splittingReceiver)).trySplit(0.3d);
    }

    @Test
    public void testLazyByteSizeEstimation() throws Exception {
        ShortIdMap shortIdMap = new ShortIdMap();
        BundleProgressReporter.InMemory inMemory = new BundleProgressReporter.InMemory();
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(this.sampler.create(), shortIdMap, inMemory, TEST_DESCRIPTOR);
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        pCollectionConsumerRegistry.register(P_COLLECTION_B, "pTransformIdA", "pTransformIdAName", fnDataReceiver);
        FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(P_COLLECTION_B);
        long encodedElementByteSize = StringUtf8Coder.of().getEncodedElementByteSize("elem");
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(new TestElementByteSizeObservableIterable(Arrays.asList("elem", "elem"), encodedElementByteSize));
        ((FnDataReceiver) Mockito.doAnswer(invocationOnMock -> {
            Iterator it = ((Iterable) ((WindowedValue) invocationOnMock.getArguments()[0]).getValue()).iterator();
            while (it.hasNext()) {
                it.next();
            }
            return null;
        }).when(fnDataReceiver)).accept(valueInGlobalWindow);
        for (int i = 0; i < 10; i++) {
            multiplexingConsumer.accept(valueInGlobalWindow);
        }
        ((FnDataReceiver) Mockito.verify(fnDataReceiver, Mockito.times(10))).accept(valueInGlobalWindow);
        ArrayList arrayList = new ArrayList();
        SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder = new SimpleMonitoringInfoBuilder();
        simpleMonitoringInfoBuilder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
        simpleMonitoringInfoBuilder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, P_COLLECTION_B);
        simpleMonitoringInfoBuilder.setInt64SumValue(10);
        arrayList.add(simpleMonitoringInfoBuilder.build());
        SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder2 = new SimpleMonitoringInfoBuilder();
        simpleMonitoringInfoBuilder2.setUrn(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE);
        simpleMonitoringInfoBuilder2.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, P_COLLECTION_B);
        long j = ((encodedElementByteSize + 1) * 2) + 5;
        simpleMonitoringInfoBuilder2.setInt64DistributionValue(DistributionData.create(10 * j, 10, j, j));
        arrayList.add(simpleMonitoringInfoBuilder2.build());
        HashMap hashMap = new HashMap();
        inMemory.updateFinalMonitoringData(hashMap);
        MatcherAssert.assertThat(Iterables.filter(shortIdMap.toMonitoringInfo(hashMap), monitoringInfo -> {
            return monitoringInfo.containsLabels(MonitoringInfoConstants.Labels.PCOLLECTION);
        }), (Matcher<? super Iterable>) Matchers.containsInAnyOrder(arrayList.toArray()));
    }

    @Test
    public void dataSampling() throws Exception {
        ShortIdMap shortIdMap = new ShortIdMap();
        BundleProgressReporter.InMemory inMemory = new BundleProgressReporter.InMemory();
        DataSampler dataSampler = new DataSampler();
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(this.sampler.create(), shortIdMap, inMemory, TEST_DESCRIPTOR, dataSampler);
        pCollectionConsumerRegistry.register(P_COLLECTION_A, "pTransformIdA", "pTransformIdAName", (FnDataReceiver) Mockito.mock(FnDataReceiver.class));
        FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(P_COLLECTION_A);
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow("elem");
        for (int i = 0; i < 10; i++) {
            multiplexingConsumer.accept(valueInGlobalWindow);
        }
        Map<String, BeamFnApi.SampleDataResponse.ElementList> elementSamplesMap = dataSampler.handleDataSampleRequest(BeamFnApi.InstructionRequest.newBuilder().setSampleData(BeamFnApi.SampleDataRequest.newBuilder()).m1130build()).m1178build().getSampleData().getElementSamplesMap();
        Assert.assertFalse(elementSamplesMap.isEmpty());
        BeamFnApi.SampleDataResponse.ElementList elementList = elementSamplesMap.get(P_COLLECTION_A);
        Assert.assertNotNull(elementList);
        ArrayList arrayList = new ArrayList();
        StringUtf8Coder of = StringUtf8Coder.of();
        for (int i2 = 0; i2 < 10; i2++) {
            ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream();
            of.encode("elem", byteStringOutputStream);
            arrayList.add(BeamFnApi.SampledElement.newBuilder().setElement(byteStringOutputStream.toByteStringAndReset()).build());
        }
        Assert.assertTrue(elementList.getElementsList().containsAll(arrayList));
    }

    @Test
    public void logsExceptionWithTransformId() throws Exception {
        Exception exc = new Exception("testException");
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        final AtomicReference atomicReference = new AtomicReference();
        final CallStreamObserver build = TestStreams.withOnNext(list -> {
            concurrentLinkedQueue.addAll(list.getLogEntriesList());
        }).withOnCompleted(() -> {
            atomicBoolean.set(true);
            ((StreamObserver) atomicReference.get()).onCompleted();
        }).build();
        Endpoints.ApiServiceDescriptor build2 = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getClass().getName() + ProcessIdUtil.DEFAULT_PROCESSID + UUID.randomUUID().toString()).build();
        Server build3 = InProcessServerBuilder.forName(build2.getUrl()).addService(new BeamFnLoggingGrpc.BeamFnLoggingImplBase() { // from class: org.apache.beam.fn.harness.data.PCollectionConsumerRegistryTest.1
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc.AsyncService
            public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> streamObserver) {
                atomicReference.set(streamObserver);
                return build;
            }
        }).build();
        build3.start();
        ManagedChannel build4 = InProcessChannelBuilder.forName(build2.getUrl()).build();
        ExecutionStateSampler.ExecutionStateTracker create = new ExecutionStateSampler(PipelineOptionsFactory.create(), System::currentTimeMillis).create();
        create.start("process-bundle");
        create.create("shortId", FnApiDoFnRunnerTest.ExecutionTest.TEST_TRANSFORM_ID, FnApiDoFnRunnerTest.ExecutionTest.TEST_TRANSFORM_ID, ExecutionStateTracker.PROCESS_STATE_NAME).activate();
        BeamFnLoggingMDC.setInstructionId("instruction");
        BeamFnLoggingMDC.setStateTracker(create);
        try {
            BeamFnLoggingClient createAndStart = BeamFnLoggingClient.createAndStart(PipelineOptionsFactory.create(), build2, apiServiceDescriptor -> {
                return build4;
            });
            Throwable th = null;
            try {
                try {
                    PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(create, new ShortIdMap(), new BundleProgressReporter.InMemory(), TEST_DESCRIPTOR);
                    FnDataReceiver fnDataReceiver = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
                    pCollectionConsumerRegistry.register(P_COLLECTION_A, FnApiDoFnRunnerTest.ExecutionTest.TEST_TRANSFORM_ID, "pTransformIdName", fnDataReceiver);
                    FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(P_COLLECTION_A);
                    ((FnDataReceiver) Mockito.doThrow(exc).when(fnDataReceiver)).accept((WindowedValue) Mockito.any());
                    this.expectedException.expectMessage("testException");
                    this.expectedException.expect(Exception.class);
                    multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow("elem"));
                    if (createAndStart != null) {
                        if (0 != 0) {
                            try {
                                createAndStart.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createAndStart.close();
                        }
                    }
                    BeamFnApi.LogEntry m1273build = BeamFnApi.LogEntry.newBuilder().setInstructionId("instruction").setTransformId(FnApiDoFnRunnerTest.ExecutionTest.TEST_TRANSFORM_ID).setMessage("Failed to process element for bundle \"process-bundle\"").m1273build();
                    ArrayList arrayList = new ArrayList(concurrentLinkedQueue);
                    Assert.assertEquals(1L, arrayList.size());
                    BeamFnApi.LogEntry logEntry = (BeamFnApi.LogEntry) arrayList.get(0);
                    Assert.assertEquals(m1273build, BeamFnApi.LogEntry.newBuilder().setInstructionId(logEntry.getInstructionId()).setTransformId(logEntry.getTransformId()).setMessage(logEntry.getMessage()).m1273build());
                    build3.shutdownNow();
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            BeamFnApi.LogEntry m1273build2 = BeamFnApi.LogEntry.newBuilder().setInstructionId("instruction").setTransformId(FnApiDoFnRunnerTest.ExecutionTest.TEST_TRANSFORM_ID).setMessage("Failed to process element for bundle \"process-bundle\"").m1273build();
            ArrayList arrayList2 = new ArrayList(concurrentLinkedQueue);
            Assert.assertEquals(1L, arrayList2.size());
            BeamFnApi.LogEntry logEntry2 = (BeamFnApi.LogEntry) arrayList2.get(0);
            Assert.assertEquals(m1273build2, BeamFnApi.LogEntry.newBuilder().setInstructionId(logEntry2.getInstructionId()).setTransformId(logEntry2.getTransformId()).setMessage(logEntry2.getMessage()).m1273build());
            build3.shutdownNow();
            throw th3;
        }
    }

    static {
        SdkComponents create = SdkComponents.create();
        try {
            TEST_DESCRIPTOR = BeamFnApi.ProcessBundleDescriptor.newBuilder().putPcollections(P_COLLECTION_A, RunnerApi.PCollection.newBuilder().setCoderId(create.registerCoder(StringUtf8Coder.of())).build()).putPcollections(P_COLLECTION_B, RunnerApi.PCollection.newBuilder().setCoderId(create.registerCoder(IterableCoder.of(StringUtf8Coder.of()))).build()).putAllCoders(create.toComponents().getCodersMap()).m1512build();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
