package org.apache.beam.fn.harness;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.BeamFnDataReadRunner;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.MultiplexingFnDataReceiver;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsMapContaining;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.class */
public class BeamFnDataReadRunnerTest {
    private static final String ELEMENT_CODER_SPEC_ID = "string-coder-id";
    private static final RunnerApi.Coder CODER_SPEC;
    private static final RunnerApi.Components COMPONENTS;
    private static final String INPUT_TRANSFORM_ID = "1";

    @Rule
    public TestExecutors.TestExecutorService executor = TestExecutors.from((Supplier<ExecutorService>) Executors::newCachedThreadPool);

    @Mock
    private BeamFnDataClient mockBeamFnDataClient;

    @Captor
    private ArgumentCaptor<FnDataReceiver<WindowedValue<String>>> consumerCaptor;
    private static final Coder<String> ELEMENT_CODER = StringUtf8Coder.of();
    private static final Coder<WindowedValue<String>> CODER = WindowedValue.getFullCoder(ELEMENT_CODER, GlobalWindow.Coder.INSTANCE);
    private static final String CODER_SPEC_ID = "windowed-string-coder-id";
    private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(Endpoints.ApiServiceDescriptor.getDefaultInstance()).setCoderId(CODER_SPEC_ID).build();

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
    }

    @Test
    public void testCreatingAndProcessingBeamFnDataReadRunner() throws Exception {
        ArrayList arrayList = new ArrayList();
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
        Objects.requireNonNull(arrayList);
        pCollectionConsumerRegistry.register("outputPC", FnApiDoFnRunnerTest.TEST_TRANSFORM_ID, (v1) -> {
            r3.add(v1);
        });
        PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
        PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
        ArrayList arrayList2 = new ArrayList();
        RunnerApi.PTransform pTransform = RemoteGrpcPortRead.readFromPort(PORT_SPEC, "outputPC").toPTransform();
        BeamFnDataReadRunner.Factory factory = new BeamFnDataReadRunner.Factory();
        PipelineOptions create = PipelineOptionsFactory.create();
        BeamFnDataClient beamFnDataClient = this.mockBeamFnDataClient;
        org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier ofInstance = Suppliers.ofInstance("57");
        Objects.requireNonNull(ofInstance);
        Supplier supplier = ofInstance::get;
        ImmutableMap of = ImmutableMap.of("outputPC", RunnerApi.PCollection.newBuilder().setCoderId(ELEMENT_CODER_SPEC_ID).build());
        Map<String, RunnerApi.Coder> codersMap = COMPONENTS.getCodersMap();
        Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = COMPONENTS.getWindowingStrategiesMap();
        Objects.requireNonNull(arrayList2);
        factory.createRunnerForPTransform(create, beamFnDataClient, (BeamFnStateClient) null, FnApiDoFnRunnerTest.TEST_TRANSFORM_ID, pTransform, supplier, of, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (v1) -> {
            r13.add(v1);
        }, (BundleSplitListener) null);
        Assert.assertThat(arrayList2, Matchers.empty());
        Mockito.verifyZeroInteractions(this.mockBeamFnDataClient);
        InboundDataClient create2 = CompletableFutureInboundDataClient.create();
        Mockito.when(this.mockBeamFnDataClient.receive((Endpoints.ApiServiceDescriptor) org.mockito.Matchers.any(), (LogicalEndpoint) org.mockito.Matchers.any(), (Coder) org.mockito.Matchers.any(), (FnDataReceiver) org.mockito.Matchers.any())).thenReturn(create2);
        ((ThrowingRunnable) Iterables.getOnlyElement(pTransformFunctionRegistry.getFunctions())).run();
        ((BeamFnDataClient) Mockito.verify(this.mockBeamFnDataClient)).receive((Endpoints.ApiServiceDescriptor) org.mockito.Matchers.eq(PORT_SPEC.getApiServiceDescriptor()), (LogicalEndpoint) org.mockito.Matchers.eq(LogicalEndpoint.of("57", FnApiDoFnRunnerTest.TEST_TRANSFORM_ID)), (Coder) org.mockito.Matchers.eq(CODER), this.consumerCaptor.capture());
        this.consumerCaptor.getValue().accept(WindowedValue.valueInGlobalWindow("TestValue"));
        Assert.assertThat(arrayList, Matchers.contains(WindowedValue.valueInGlobalWindow("TestValue")));
        arrayList.clear();
        Assert.assertThat(pCollectionConsumerRegistry.keySet(), Matchers.containsInAnyOrder("outputPC"));
        create2.complete();
        ((ThrowingRunnable) Iterables.getOnlyElement(pTransformFunctionRegistry2.getFunctions())).run();
        Mockito.verifyNoMoreInteractions(this.mockBeamFnDataClient);
    }

    @Test
    public void testReuseForMultipleBundles() throws Exception {
        InboundDataClient create = CompletableFutureInboundDataClient.create();
        InboundDataClient create2 = CompletableFutureInboundDataClient.create();
        Mockito.when(this.mockBeamFnDataClient.receive((Endpoints.ApiServiceDescriptor) org.mockito.Matchers.any(), (LogicalEndpoint) org.mockito.Matchers.any(), (Coder) org.mockito.Matchers.any(), (FnDataReceiver) org.mockito.Matchers.any())).thenReturn(create).thenReturn(create2);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Objects.requireNonNull(arrayList);
        FnDataReceiver fnDataReceiver = (v1) -> {
            r0.add(v1);
        };
        Objects.requireNonNull(arrayList2);
        FnDataReceiver forConsumers = MultiplexingFnDataReceiver.forConsumers(ImmutableList.of(fnDataReceiver, (v1) -> {
            r1.add(v1);
        }));
        AtomicReference atomicReference = new AtomicReference("0");
        RunnerApi.PTransform pTransform = RemoteGrpcPortRead.readFromPort(PORT_SPEC, "localOutput").toPTransform();
        Objects.requireNonNull(atomicReference);
        BeamFnDataReadRunner beamFnDataReadRunner = new BeamFnDataReadRunner(INPUT_TRANSFORM_ID, pTransform, atomicReference::get, CODER_SPEC, COMPONENTS.getCodersMap(), this.mockBeamFnDataClient, forConsumers);
        beamFnDataReadRunner.registerInputLocation();
        ((BeamFnDataClient) Mockito.verify(this.mockBeamFnDataClient)).receive((Endpoints.ApiServiceDescriptor) org.mockito.Matchers.eq(PORT_SPEC.getApiServiceDescriptor()), (LogicalEndpoint) org.mockito.Matchers.eq(LogicalEndpoint.of((String) atomicReference.get(), INPUT_TRANSFORM_ID)), (Coder) org.mockito.Matchers.eq(CODER), this.consumerCaptor.capture());
        Future<?> submit = this.executor.submit(() -> {
            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
            try {
                this.consumerCaptor.getValue().accept(WindowedValue.valueInGlobalWindow("ABC"));
                this.consumerCaptor.getValue().accept(WindowedValue.valueInGlobalWindow("DEF"));
            } catch (Exception e) {
                create.fail(e);
            } finally {
                create.complete();
            }
        });
        beamFnDataReadRunner.blockTillReadFinishes();
        submit.get();
        Assert.assertThat(arrayList, Matchers.contains(WindowedValue.valueInGlobalWindow("ABC"), WindowedValue.valueInGlobalWindow("DEF")));
        Assert.assertThat(arrayList2, Matchers.contains(WindowedValue.valueInGlobalWindow("ABC"), WindowedValue.valueInGlobalWindow("DEF")));
        atomicReference.set(INPUT_TRANSFORM_ID);
        arrayList.clear();
        arrayList2.clear();
        beamFnDataReadRunner.registerInputLocation();
        ((BeamFnDataClient) Mockito.verify(this.mockBeamFnDataClient)).receive((Endpoints.ApiServiceDescriptor) org.mockito.Matchers.eq(PORT_SPEC.getApiServiceDescriptor()), (LogicalEndpoint) org.mockito.Matchers.eq(LogicalEndpoint.of((String) atomicReference.get(), INPUT_TRANSFORM_ID)), (Coder) org.mockito.Matchers.eq(CODER), this.consumerCaptor.capture());
        Future<?> submit2 = this.executor.submit(() -> {
            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
            try {
                this.consumerCaptor.getValue().accept(WindowedValue.valueInGlobalWindow("GHI"));
                this.consumerCaptor.getValue().accept(WindowedValue.valueInGlobalWindow("JKL"));
            } catch (Exception e) {
                create2.fail(e);
            } finally {
                create2.complete();
            }
        });
        beamFnDataReadRunner.blockTillReadFinishes();
        submit2.get();
        Assert.assertThat(arrayList, Matchers.contains(WindowedValue.valueInGlobalWindow("GHI"), WindowedValue.valueInGlobalWindow("JKL")));
        Assert.assertThat(arrayList2, Matchers.contains(WindowedValue.valueInGlobalWindow("GHI"), WindowedValue.valueInGlobalWindow("JKL")));
        Mockito.verifyNoMoreInteractions(this.mockBeamFnDataClient);
    }

    @Test
    public void testRegistration() {
        Iterator it = ServiceLoader.load(PTransformRunnerFactory.Registrar.class).iterator();
        while (it.hasNext()) {
            PTransformRunnerFactory.Registrar registrar = (PTransformRunnerFactory.Registrar) it.next();
            if (registrar instanceof BeamFnDataReadRunner.Registrar) {
                Assert.assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey(RemoteGrpcPortRead.URN));
                return;
            }
        }
        Assert.fail("Expected registrar not found.");
    }

    static {
        try {
            RunnerApi.MessageWithComponents proto = CoderTranslation.toProto(CODER);
            CODER_SPEC = proto.getCoder();
            COMPONENTS = proto.getComponents().toBuilder().putCoders(CODER_SPEC_ID, CODER_SPEC).putCoders(ELEMENT_CODER_SPEC_ID, CoderTranslation.toProto(ELEMENT_CODER).getCoder()).build();
        } catch (IOException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
}
