package org.apache.beam.fn.harness.control;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingRunnable;
import org.apache.beam.fn.v1.BeamFnApi;
import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.runners.dataflow.util.DoFnInfo;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.base.Suppliers;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.collect.HashMultimap;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.collect.ImmutableMultimap;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.collect.ImmutableSet;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.collect.Iterables;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.class */
public class ProcessBundleHandlerTest {
    private static final String LONG_CODER_SPEC_ID = "998L";
    private static final String STRING_CODER_SPEC_ID = "999L";
    private static final BeamFnApi.Coder LONG_CODER_SPEC;
    private static final BeamFnApi.Coder STRING_CODER_SPEC;
    private static final String DATA_INPUT_URN = "urn:org.apache.beam:source:runner:0.1";
    private static final String DATA_OUTPUT_URN = "urn:org.apache.beam:sink:runner:0.1";
    private static final String JAVA_DO_FN_URN = "urn:org.apache.beam:dofn:java:0.1";
    private static final String JAVA_SOURCE_URN = "urn:org.apache.beam:source:java:0.1";

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Mock
    private BeamFnDataClient beamFnDataClient;

    @Captor
    private ArgumentCaptor<ThrowingConsumer<WindowedValue<String>>> consumerCaptor;
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final Coder<WindowedValue<String>> STRING_CODER = WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
    private static final BeamFnApi.RemoteGrpcPort REMOTE_PORT = BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.newBuilder().setId("58L").setUrl("TestUrl")).build();

    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandlerTest$TestDoFn.class */
    private static class TestDoFn extends DoFn<String, String> {
        private static final TupleTag<String> mainOutput = new TupleTag<>("mainOutput");
        private static final TupleTag<String> additionalOutput = new TupleTag<>("output");
        private BoundedWindow window;

        private TestDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<String, String>.ProcessContext processContext, BoundedWindow boundedWindow) {
            processContext.output("MainOutput" + ((String) processContext.element()));
            processContext.output(additionalOutput, "AdditionalOutput" + ((String) processContext.element()));
            this.window = boundedWindow;
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn<String, String>.FinishBundleContext finishBundleContext) {
            if (this.window != null) {
                finishBundleContext.output("FinishBundle", this.window.maxTimestamp(), this.window);
                this.window = null;
            }
        }
    }

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
    }

    @Test
    public void testOrderOfStartAndFinishCalls() throws Exception {
        BeamFnApi.ProcessBundleDescriptor build = BeamFnApi.ProcessBundleDescriptor.newBuilder().addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("2L")).addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("3L")).build();
        ImmutableMap of = ImmutableMap.of("1L", build);
        final ArrayList arrayList = new ArrayList();
        final ArrayList arrayList2 = new ArrayList();
        PipelineOptions create = PipelineOptionsFactory.create();
        of.getClass();
        new ProcessBundleHandler(create, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient) { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandlerTest.1
            protected <InputT, OutputT> void createConsumersForPrimitiveTransform(BeamFnApi.PrimitiveTransform primitiveTransform, Supplier<String> supplier, Function<BeamFnApi.Target, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> function, BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> biConsumer, Consumer<ThrowingRunnable> consumer, Consumer<ThrowingRunnable> consumer2) throws IOException {
                Assert.assertThat(supplier.get(), Matchers.equalTo(ProcessBundleHandlerTest.STRING_CODER_SPEC_ID));
                arrayList.add(primitiveTransform);
                List list = arrayList2;
                consumer.accept(() -> {
                    list.add("Start" + primitiveTransform.getId());
                });
                List list2 = arrayList2;
                consumer2.accept(() -> {
                    list2.add("Finish" + primitiveTransform.getId());
                });
            }
        }.processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(STRING_CODER_SPEC_ID).setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L")).build());
        Assert.assertThat(arrayList, Matchers.contains(new BeamFnApi.PrimitiveTransform[]{build.getPrimitiveTransform(1), build.getPrimitiveTransform(0)}));
        Assert.assertThat(arrayList2, Matchers.contains(new String[]{"Start3L", "Start2L", "Finish2L", "Finish3L"}));
    }

    @Test
    public void testCreatingPrimitiveTransformExceptionsArePropagated() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("2L")).addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("3L")).build());
        PipelineOptions create = PipelineOptionsFactory.create();
        of.getClass();
        new ProcessBundleHandler(create, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient) { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandlerTest.2
            protected <InputT, OutputT> void createConsumersForPrimitiveTransform(BeamFnApi.PrimitiveTransform primitiveTransform, Supplier<String> supplier, Function<BeamFnApi.Target, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> function, BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> biConsumer, Consumer<ThrowingRunnable> consumer, Consumer<ThrowingRunnable> consumer2) throws IOException {
                ProcessBundleHandlerTest.this.thrown.expect(IllegalStateException.class);
                ProcessBundleHandlerTest.this.thrown.expectMessage("TestException");
                throw new IllegalStateException("TestException");
            }
        }.processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L")).build());
    }

    @Test
    public void testPrimitiveTransformStartExceptionsArePropagated() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("2L")).addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("3L")).build());
        PipelineOptions create = PipelineOptionsFactory.create();
        of.getClass();
        new ProcessBundleHandler(create, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient) { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandlerTest.3
            protected <InputT, OutputT> void createConsumersForPrimitiveTransform(BeamFnApi.PrimitiveTransform primitiveTransform, Supplier<String> supplier, Function<BeamFnApi.Target, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> function, BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> biConsumer, Consumer<ThrowingRunnable> consumer, Consumer<ThrowingRunnable> consumer2) throws IOException {
                ProcessBundleHandlerTest.this.thrown.expect(IllegalStateException.class);
                ProcessBundleHandlerTest.this.thrown.expectMessage("TestException");
                consumer.accept(this::throwException);
            }

            private void throwException() {
                throw new IllegalStateException("TestException");
            }
        }.processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L")).build());
    }

    @Test
    public void testPrimitiveTransformFinishExceptionsArePropagated() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("2L")).addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("3L")).build());
        PipelineOptions create = PipelineOptionsFactory.create();
        of.getClass();
        new ProcessBundleHandler(create, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient) { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandlerTest.4
            protected <InputT, OutputT> void createConsumersForPrimitiveTransform(BeamFnApi.PrimitiveTransform primitiveTransform, Supplier<String> supplier, Function<BeamFnApi.Target, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> function, BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> biConsumer, Consumer<ThrowingRunnable> consumer, Consumer<ThrowingRunnable> consumer2) throws IOException {
                ProcessBundleHandlerTest.this.thrown.expect(IllegalStateException.class);
                ProcessBundleHandlerTest.this.thrown.expectMessage("TestException");
                consumer2.accept(this::throwException);
            }

            private void throwException() {
                throw new IllegalStateException("TestException");
            }
        }.processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L")).build());
    }

    @Test
    public void testCreatingAndProcessingDoFn() throws Exception {
        ImmutableMap of = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC);
        BeamFnApi.FunctionSpec build = BeamFnApi.FunctionSpec.newBuilder().setId("1L").setUrn(JAVA_DO_FN_URN).setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(SerializableUtils.serializeToByteArray(DoFnInfo.forFn(new TestDoFn(), WindowingStrategy.globalDefault(), ImmutableList.of(), STRING_CODER, 101L, ImmutableMap.of(101L, TestDoFn.mainOutput, 102L, TestDoFn.additionalOutput))))).build())).build();
        BeamFnApi.Target build2 = BeamFnApi.Target.newBuilder().setPrimitiveTransformReference("1000L").setName("inputATarget1").build();
        BeamFnApi.Target build3 = BeamFnApi.Target.newBuilder().setPrimitiveTransformReference("1001L").setName("inputATarget1").build();
        BeamFnApi.Target build4 = BeamFnApi.Target.newBuilder().setPrimitiveTransformReference("1002L").setName("inputBTarget").build();
        BeamFnApi.PrimitiveTransform build5 = BeamFnApi.PrimitiveTransform.newBuilder().setId("100L").setFunctionSpec(build).putInputs("inputA", BeamFnApi.Target.List.newBuilder().addTarget(build2).addTarget(build3).build()).putInputs("inputB", BeamFnApi.Target.List.newBuilder().addTarget(build4).build()).putOutputs(Long.toString(101L), BeamFnApi.PCollection.newBuilder().setCoderReference(STRING_CODER_SPEC_ID).build()).putOutputs(Long.toString(102L), BeamFnApi.PCollection.newBuilder().setCoderReference(STRING_CODER_SPEC_ID).build()).build();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        BeamFnApi.Target build6 = BeamFnApi.Target.newBuilder().setPrimitiveTransformReference("100L").setName(Long.toString(101L)).build();
        BeamFnApi.Target build7 = BeamFnApi.Target.newBuilder().setPrimitiveTransformReference("100L").setName(Long.toString(102L)).build();
        arrayList.getClass();
        ThrowingConsumer throwingConsumer = (v1) -> {
            r1.add(v1);
        };
        arrayList2.getClass();
        ImmutableMultimap of2 = ImmutableMultimap.of(build6, throwingConsumer, build7, (v1) -> {
            r3.add(v1);
        });
        HashMultimap create = HashMultimap.create();
        ArrayList arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        PipelineOptions create2 = PipelineOptionsFactory.create();
        of.getClass();
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create2, (v1) -> {
            return r3.get(v1);
        }, this.beamFnDataClient);
        org.apache.beam.sdks.java.harness.repackaged.com.google.common.base.Supplier ofInstance = Suppliers.ofInstance("57L");
        ofInstance.getClass();
        Supplier supplier = ofInstance::get;
        of2.getClass();
        Function function = (v1) -> {
            return r3.get(v1);
        };
        create.getClass();
        BiConsumer biConsumer = (v1, v2) -> {
            r4.put(v1, v2);
        };
        arrayList3.getClass();
        Consumer consumer = (v1) -> {
            r5.add(v1);
        };
        arrayList4.getClass();
        processBundleHandler.createConsumersForPrimitiveTransform(build5, supplier, function, biConsumer, consumer, (v1) -> {
            r6.add(v1);
        });
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList3)).run();
        arrayList.clear();
        Assert.assertEquals(create.keySet(), ImmutableSet.of(build2, build3, build4));
        ((ThrowingConsumer) Iterables.getOnlyElement(create.get(build2))).accept(WindowedValue.valueInGlobalWindow("A1"));
        ((ThrowingConsumer) Iterables.getOnlyElement(create.get(build2))).accept(WindowedValue.valueInGlobalWindow("A2"));
        ((ThrowingConsumer) Iterables.getOnlyElement(create.get(build2))).accept(WindowedValue.valueInGlobalWindow("B"));
        Assert.assertThat(arrayList, Matchers.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("MainOutputA1"), WindowedValue.valueInGlobalWindow("MainOutputA2"), WindowedValue.valueInGlobalWindow("MainOutputB")}));
        Assert.assertThat(arrayList2, Matchers.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("AdditionalOutputA1"), WindowedValue.valueInGlobalWindow("AdditionalOutputA2"), WindowedValue.valueInGlobalWindow("AdditionalOutputB")}));
        arrayList.clear();
        arrayList2.clear();
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList4)).run();
        Assert.assertThat(arrayList, Matchers.contains(new WindowedValue[]{WindowedValue.timestampedValueInGlobalWindow("FinishBundle", GlobalWindow.INSTANCE.maxTimestamp())}));
        arrayList.clear();
    }

    @Test
    public void testCreatingAndProcessingSource() throws Exception {
        ImmutableMap of = ImmutableMap.of(LONG_CODER_SPEC_ID, LONG_CODER_SPEC);
        BeamFnApi.Target build = BeamFnApi.Target.newBuilder().setPrimitiveTransformReference("1000L").setName("inputTarget").build();
        ArrayList arrayList = new ArrayList();
        BeamFnApi.Target build2 = BeamFnApi.Target.newBuilder().setPrimitiveTransformReference("100L").setName(Long.toString(101L)).build();
        arrayList.getClass();
        ImmutableMultimap of2 = ImmutableMultimap.of(build2, (v1) -> {
            r1.add(v1);
        });
        HashMultimap create = HashMultimap.create();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        BeamFnApi.PrimitiveTransform build3 = BeamFnApi.PrimitiveTransform.newBuilder().setId("100L").setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder().setId("1L").setUrn(JAVA_SOURCE_URN).setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(SerializableUtils.serializeToByteArray(CountingSource.upTo(3L)))).build())).build()).putInputs("input", BeamFnApi.Target.List.newBuilder().addTarget(build).build()).putOutputs(Long.toString(101L), BeamFnApi.PCollection.newBuilder().setCoderReference(LONG_CODER_SPEC_ID).build()).build();
        PipelineOptions create2 = PipelineOptionsFactory.create();
        of.getClass();
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create2, (v1) -> {
            return r3.get(v1);
        }, this.beamFnDataClient);
        org.apache.beam.sdks.java.harness.repackaged.com.google.common.base.Supplier ofInstance = Suppliers.ofInstance("57L");
        ofInstance.getClass();
        Supplier supplier = ofInstance::get;
        of2.getClass();
        Function function = (v1) -> {
            return r3.get(v1);
        };
        create.getClass();
        BiConsumer biConsumer = (v1, v2) -> {
            r4.put(v1, v2);
        };
        arrayList2.getClass();
        Consumer consumer = (v1) -> {
            r5.add(v1);
        };
        arrayList3.getClass();
        processBundleHandler.createConsumersForPrimitiveTransform(build3, supplier, function, biConsumer, consumer, (v1) -> {
            r6.add(v1);
        });
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
        Assert.assertThat(arrayList, Matchers.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow(0L), WindowedValue.valueInGlobalWindow(1L), WindowedValue.valueInGlobalWindow(2L)}));
        arrayList.clear();
        Assert.assertEquals(create.keySet(), ImmutableSet.of(build));
        ((ThrowingConsumer) Iterables.getOnlyElement(create.get(build))).accept(WindowedValue.valueInGlobalWindow(CountingSource.upTo(2L)));
        Assert.assertThat(arrayList, Matchers.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow(0L), WindowedValue.valueInGlobalWindow(1L)}));
        Assert.assertThat(arrayList3, Matchers.empty());
    }

    @Test
    public void testCreatingAndProcessingBeamFnDataReadRunner() throws Exception {
        ImmutableMap of = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC);
        ArrayList arrayList = new ArrayList();
        BeamFnApi.Target build = BeamFnApi.Target.newBuilder().setPrimitiveTransformReference("100L").setName(Long.toString(101L)).build();
        arrayList.getClass();
        ImmutableMultimap of2 = ImmutableMultimap.of(build, (v1) -> {
            r1.add(v1);
        });
        HashMultimap create = HashMultimap.create();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        BeamFnApi.PrimitiveTransform build2 = BeamFnApi.PrimitiveTransform.newBuilder().setId("100L").setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder().setId("1L").setUrn(DATA_INPUT_URN).setData(Any.pack(REMOTE_PORT)).build()).putInputs("input", BeamFnApi.Target.List.getDefaultInstance()).putOutputs(Long.toString(101L), BeamFnApi.PCollection.newBuilder().setCoderReference(STRING_CODER_SPEC_ID).build()).build();
        PipelineOptions create2 = PipelineOptionsFactory.create();
        of.getClass();
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create2, (v1) -> {
            return r3.get(v1);
        }, this.beamFnDataClient);
        org.apache.beam.sdks.java.harness.repackaged.com.google.common.base.Supplier ofInstance = Suppliers.ofInstance("57L");
        ofInstance.getClass();
        Supplier supplier = ofInstance::get;
        of2.getClass();
        Function function = (v1) -> {
            return r3.get(v1);
        };
        create.getClass();
        BiConsumer biConsumer = (v1, v2) -> {
            r4.put(v1, v2);
        };
        arrayList2.getClass();
        Consumer consumer = (v1) -> {
            r5.add(v1);
        };
        arrayList3.getClass();
        processBundleHandler.createConsumersForPrimitiveTransform(build2, supplier, function, biConsumer, consumer, (v1) -> {
            r6.add(v1);
        });
        Mockito.verifyZeroInteractions(new Object[]{this.beamFnDataClient});
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(this.beamFnDataClient.forInboundConsumer((BeamFnApi.ApiServiceDescriptor) org.mockito.Matchers.any(), (KV) org.mockito.Matchers.any(), (Coder) org.mockito.Matchers.any(), (ThrowingConsumer) org.mockito.Matchers.any())).thenReturn(completableFuture);
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
        ((BeamFnDataClient) Mockito.verify(this.beamFnDataClient)).forInboundConsumer((BeamFnApi.ApiServiceDescriptor) org.mockito.Matchers.eq(REMOTE_PORT.getApiServiceDescriptor()), (KV) org.mockito.Matchers.eq(KV.of("57L", BeamFnApi.Target.newBuilder().setPrimitiveTransformReference("100L").setName("input").build())), (Coder) org.mockito.Matchers.eq(STRING_CODER), (ThrowingConsumer) this.consumerCaptor.capture());
        ((ThrowingConsumer) this.consumerCaptor.getValue()).accept(WindowedValue.valueInGlobalWindow("TestValue"));
        Assert.assertThat(arrayList, Matchers.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("TestValue")}));
        arrayList.clear();
        Assert.assertThat(create.keySet(), Matchers.empty());
        completableFuture.complete(null);
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList3)).run();
        Mockito.verifyNoMoreInteractions(new Object[]{this.beamFnDataClient});
    }

    @Test
    public void testCreatingAndProcessingBeamFnDataWriteRunner() throws Exception {
        ImmutableMap of = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC);
        BeamFnApi.Target build = BeamFnApi.Target.newBuilder().setPrimitiveTransformReference("1000L").setName("inputTarget").build();
        ImmutableMultimap of2 = ImmutableMultimap.of();
        HashMultimap create = HashMultimap.create();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        BeamFnApi.PrimitiveTransform build2 = BeamFnApi.PrimitiveTransform.newBuilder().setId("100L").setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder().setId("1L").setUrn(DATA_OUTPUT_URN).setData(Any.pack(REMOTE_PORT)).build()).putInputs("input", BeamFnApi.Target.List.newBuilder().addTarget(build).build()).putOutputs(Long.toString(101L), BeamFnApi.PCollection.newBuilder().setCoderReference(STRING_CODER_SPEC_ID).build()).build();
        PipelineOptions create2 = PipelineOptionsFactory.create();
        of.getClass();
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create2, (v1) -> {
            return r3.get(v1);
        }, this.beamFnDataClient);
        org.apache.beam.sdks.java.harness.repackaged.com.google.common.base.Supplier ofInstance = Suppliers.ofInstance("57L");
        ofInstance.getClass();
        Supplier supplier = ofInstance::get;
        of2.getClass();
        Function function = (v1) -> {
            return r3.get(v1);
        };
        create.getClass();
        BiConsumer biConsumer = (v1, v2) -> {
            r4.put(v1, v2);
        };
        arrayList.getClass();
        Consumer consumer = (v1) -> {
            r5.add(v1);
        };
        arrayList2.getClass();
        processBundleHandler.createConsumersForPrimitiveTransform(build2, supplier, function, biConsumer, consumer, (v1) -> {
            r6.add(v1);
        });
        Mockito.verifyZeroInteractions(new Object[]{this.beamFnDataClient});
        final ArrayList arrayList3 = new ArrayList();
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        Mockito.when(this.beamFnDataClient.forOutboundConsumer((BeamFnApi.ApiServiceDescriptor) org.mockito.Matchers.any(), (KV) org.mockito.Matchers.any(), (Coder) org.mockito.Matchers.any())).thenReturn(new CloseableThrowingConsumer<WindowedValue<String>>() { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandlerTest.5
            public void close() throws Exception {
                atomicBoolean.set(true);
            }

            public void accept(WindowedValue<String> windowedValue) throws Exception {
                arrayList3.add(windowedValue);
            }
        });
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList)).run();
        ((BeamFnDataClient) Mockito.verify(this.beamFnDataClient)).forOutboundConsumer((BeamFnApi.ApiServiceDescriptor) org.mockito.Matchers.eq(REMOTE_PORT.getApiServiceDescriptor()), (KV) org.mockito.Matchers.eq(KV.of("57L", BeamFnApi.Target.newBuilder().setPrimitiveTransformReference("100L").setName(Long.toString(101L)).build())), (Coder) org.mockito.Matchers.eq(STRING_CODER));
        Assert.assertEquals(create.keySet(), ImmutableSet.of(build));
        ((ThrowingConsumer) Iterables.getOnlyElement(create.get(build))).accept(WindowedValue.valueInGlobalWindow("TestValue"));
        Assert.assertThat(arrayList3, Matchers.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("TestValue")}));
        arrayList3.clear();
        Assert.assertFalse(atomicBoolean.get());
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
        Assert.assertTrue(atomicBoolean.get());
        Mockito.verifyNoMoreInteractions(new Object[]{this.beamFnDataClient});
    }

    static {
        try {
            STRING_CODER_SPEC = BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder().setId(STRING_CODER_SPEC_ID).setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(STRING_CODER)))).build()))).build();
            LONG_CODER_SPEC = BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder().setId(STRING_CODER_SPEC_ID).setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(WindowedValue.getFullCoder(VarLongCoder.of(), GlobalWindow.Coder.INSTANCE))))).build()))).build();
        } catch (IOException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
}
