package org.apache.beam.fn.harness;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.beam.fn.harness.BeamFnDataWriteRunner;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_sdks_java_harness.com.google.common.base.Supplier;
import org.apache.beam.repackaged.beam_sdks_java_harness.com.google.common.base.Suppliers;
import org.apache.beam.repackaged.beam_sdks_java_harness.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.repackaged.beam_sdks_java_harness.com.google.common.collect.ImmutableMap;
import org.apache.beam.repackaged.beam_sdks_java_harness.com.google.common.collect.Iterables;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
import org.apache.beam.sdk.fn.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.hamcrest.collection.IsMapContaining;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.class */
public class BeamFnDataWriteRunnerTest {
    private static final String ELEM_CODER_ID = "string-coder-id";
    private static final RunnerApi.Coder WIRE_CODER_SPEC;
    private static final RunnerApi.Components COMPONENTS;
    private static final BeamFnApi.Target OUTPUT_TARGET;

    @Mock
    private BeamFnDataClient mockBeamFnDataClient;
    private static final Coder<String> ELEM_CODER = StringUtf8Coder.of();
    private static final Coder<WindowedValue<String>> WIRE_CODER = WindowedValue.getFullCoder(ELEM_CODER, GlobalWindow.Coder.INSTANCE);
    private static final String WIRE_CODER_ID = "windowed-string-coder-id";
    private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(Endpoints.ApiServiceDescriptor.getDefaultInstance()).setCoderId(WIRE_CODER_ID).build();

    /* loaded from: input_file:org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest$RecordingReceiver.class */
    private static class RecordingReceiver<T> extends ArrayList<T> implements CloseableFnDataReceiver<T> {
        private boolean closed;

        private RecordingReceiver() {
        }

        @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver, java.lang.AutoCloseable
        public void close() throws Exception {
            this.closed = true;
        }

        @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
        public void accept(T t) throws Exception {
            if (this.closed) {
                throw new IllegalStateException("Consumer is closed but attempting to consume " + t);
            }
            add(t);
        }
    }

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
    }

    @Test
    public void testCreatingAndProcessingBeamFnDataWriteRunner() throws Exception {
        ArrayListMultimap create = ArrayListMultimap.create();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        RunnerApi.PTransform pTransform = RemoteGrpcPortWrite.writeToPort("inputPC", PORT_SPEC).toPTransform();
        BeamFnDataWriteRunner.Factory factory = new BeamFnDataWriteRunner.Factory();
        PipelineOptions create2 = PipelineOptionsFactory.create();
        BeamFnDataClient beamFnDataClient = this.mockBeamFnDataClient;
        Supplier ofInstance = Suppliers.ofInstance("57L");
        Objects.requireNonNull(ofInstance);
        java.util.function.Supplier supplier = ofInstance::get;
        ImmutableMap of = ImmutableMap.of("inputPC", RunnerApi.PCollection.newBuilder().setCoderId(ELEM_CODER_ID).build());
        Map<String, RunnerApi.Coder> codersMap = COMPONENTS.getCodersMap();
        Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = COMPONENTS.getWindowingStrategiesMap();
        Objects.requireNonNull(arrayList);
        Consumer consumer = (v1) -> {
            r11.add(v1);
        };
        Objects.requireNonNull(arrayList2);
        factory.createRunnerForPTransform(create2, beamFnDataClient, (BeamFnStateClient) null, "ptransformId", pTransform, supplier, of, codersMap, windowingStrategiesMap, create, consumer, (v1) -> {
            r12.add(v1);
        }, (BundleSplitListener) null);
        Mockito.verifyZeroInteractions(this.mockBeamFnDataClient);
        final ArrayList arrayList3 = new ArrayList();
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        Mockito.when(this.mockBeamFnDataClient.send((Endpoints.ApiServiceDescriptor) Matchers.any(), (LogicalEndpoint) Matchers.any(), (Coder) Matchers.any())).thenReturn(new CloseableFnDataReceiver<WindowedValue<String>>() { // from class: org.apache.beam.fn.harness.BeamFnDataWriteRunnerTest.1
            @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver, java.lang.AutoCloseable
            public void close() throws Exception {
                atomicBoolean.set(true);
            }

            @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
            public void accept(WindowedValue<String> windowedValue) throws Exception {
                arrayList3.add(windowedValue);
            }
        });
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList)).run();
        ((BeamFnDataClient) Mockito.verify(this.mockBeamFnDataClient)).send((Endpoints.ApiServiceDescriptor) Matchers.eq(PORT_SPEC.getApiServiceDescriptor()), (LogicalEndpoint) Matchers.eq(LogicalEndpoint.of("57L", BeamFnApi.Target.newBuilder().setPrimitiveTransformReference("ptransformId").setName((String) Iterables.getOnlyElement(pTransform.getInputsMap().keySet())).build())), (Coder) Matchers.eq(WIRE_CODER));
        Assert.assertThat(create.keySet(), org.hamcrest.Matchers.containsInAnyOrder("inputPC"));
        ((FnDataReceiver) Iterables.getOnlyElement(create.get((ArrayListMultimap) "inputPC"))).accept(WindowedValue.valueInGlobalWindow("TestValue"));
        Assert.assertThat(arrayList3, org.hamcrest.Matchers.contains(WindowedValue.valueInGlobalWindow("TestValue")));
        arrayList3.clear();
        Assert.assertFalse(atomicBoolean.get());
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
        Assert.assertTrue(atomicBoolean.get());
        Mockito.verifyNoMoreInteractions(this.mockBeamFnDataClient);
    }

    @Test
    public void testReuseForMultipleBundles() throws Exception {
        RecordingReceiver recordingReceiver = new RecordingReceiver();
        RecordingReceiver recordingReceiver2 = new RecordingReceiver();
        Mockito.when(this.mockBeamFnDataClient.send((Endpoints.ApiServiceDescriptor) Matchers.any(), (LogicalEndpoint) Matchers.any(), (Coder) Matchers.any())).thenReturn(recordingReceiver).thenReturn(recordingReceiver2);
        AtomicReference atomicReference = new AtomicReference("0");
        RunnerApi.PTransform pTransform = RemoteGrpcPortWrite.writeToPort("myWrite", PORT_SPEC).toPTransform();
        Objects.requireNonNull(atomicReference);
        BeamFnDataWriteRunner beamFnDataWriteRunner = new BeamFnDataWriteRunner(pTransform, atomicReference::get, OUTPUT_TARGET, WIRE_CODER_SPEC, COMPONENTS.getCodersMap(), this.mockBeamFnDataClient);
        beamFnDataWriteRunner.registerForOutput();
        ((BeamFnDataClient) Mockito.verify(this.mockBeamFnDataClient)).send((Endpoints.ApiServiceDescriptor) Matchers.eq(PORT_SPEC.getApiServiceDescriptor()), (LogicalEndpoint) Matchers.eq(LogicalEndpoint.of((String) atomicReference.get(), OUTPUT_TARGET)), (Coder) Matchers.eq(WIRE_CODER));
        beamFnDataWriteRunner.consume(WindowedValue.valueInGlobalWindow("ABC"));
        beamFnDataWriteRunner.consume(WindowedValue.valueInGlobalWindow("DEF"));
        beamFnDataWriteRunner.close();
        Assert.assertTrue(recordingReceiver.closed);
        Assert.assertThat(recordingReceiver, org.hamcrest.Matchers.contains(WindowedValue.valueInGlobalWindow("ABC"), WindowedValue.valueInGlobalWindow("DEF")));
        atomicReference.set("1");
        recordingReceiver.clear();
        recordingReceiver2.clear();
        beamFnDataWriteRunner.registerForOutput();
        ((BeamFnDataClient) Mockito.verify(this.mockBeamFnDataClient)).send((Endpoints.ApiServiceDescriptor) Matchers.eq(PORT_SPEC.getApiServiceDescriptor()), (LogicalEndpoint) Matchers.eq(LogicalEndpoint.of((String) atomicReference.get(), OUTPUT_TARGET)), (Coder) Matchers.eq(WIRE_CODER));
        beamFnDataWriteRunner.consume(WindowedValue.valueInGlobalWindow("GHI"));
        beamFnDataWriteRunner.consume(WindowedValue.valueInGlobalWindow("JKL"));
        beamFnDataWriteRunner.close();
        Assert.assertTrue(recordingReceiver2.closed);
        Assert.assertThat(recordingReceiver2, org.hamcrest.Matchers.contains(WindowedValue.valueInGlobalWindow("GHI"), WindowedValue.valueInGlobalWindow("JKL")));
        Mockito.verifyNoMoreInteractions(this.mockBeamFnDataClient);
    }

    @Test
    public void testRegistration() {
        Iterator it = ServiceLoader.load(PTransformRunnerFactory.Registrar.class).iterator();
        while (it.hasNext()) {
            PTransformRunnerFactory.Registrar registrar = (PTransformRunnerFactory.Registrar) it.next();
            if (registrar instanceof BeamFnDataWriteRunner.Registrar) {
                Assert.assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey(RemoteGrpcPortWrite.URN));
                return;
            }
        }
        Assert.fail("Expected registrar not found.");
    }

    static {
        try {
            RunnerApi.MessageWithComponents proto = CoderTranslation.toProto(WIRE_CODER);
            WIRE_CODER_SPEC = proto.getCoder();
            COMPONENTS = proto.getComponents().toBuilder().putCoders(WIRE_CODER_ID, WIRE_CODER_SPEC).putCoders(ELEM_CODER_ID, CoderTranslation.toProto(ELEM_CODER).getCoder()).build();
            OUTPUT_TARGET = BeamFnApi.Target.newBuilder().setPrimitiveTransformReference("1").setName("out").build();
        } catch (IOException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
}
