package org.apache.beam.fn.harness;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.ServiceLoader;
import org.apache.beam.fn.harness.BoundedSourceRunner;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsEmptyCollection;
import org.hamcrest.collection.IsMapContaining;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/BoundedSourceRunnerTest.class */
public class BoundedSourceRunnerTest {
    public static final String URN = "beam:source:java:0.1";

    @Test
    public void testRunReadLoopWithMultipleSources() throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Objects.requireNonNull(arrayList);
        FnDataReceiver fnDataReceiver = (v1) -> {
            r0.add(v1);
        };
        Objects.requireNonNull(arrayList2);
        BoundedSourceRunner boundedSourceRunner = new BoundedSourceRunner(PipelineOptionsFactory.create(), RunnerApi.FunctionSpec.getDefaultInstance(), ImmutableList.of(fnDataReceiver, (v1) -> {
            r1.add(v1);
        }));
        boundedSourceRunner.runReadLoop(WindowedValue.valueInGlobalWindow(CountingSource.upTo(2L)));
        boundedSourceRunner.runReadLoop(WindowedValue.valueInGlobalWindow(CountingSource.upTo(1L)));
        Assert.assertThat(arrayList, Matchers.contains(WindowedValue.valueInGlobalWindow(0L), WindowedValue.valueInGlobalWindow(1L), WindowedValue.valueInGlobalWindow(0L)));
        Assert.assertThat(arrayList2, Matchers.contains(WindowedValue.valueInGlobalWindow(0L), WindowedValue.valueInGlobalWindow(1L), WindowedValue.valueInGlobalWindow(0L)));
    }

    @Test
    public void testRunReadLoopWithEmptySource() throws Exception {
        ArrayList arrayList = new ArrayList();
        Objects.requireNonNull(arrayList);
        new BoundedSourceRunner(PipelineOptionsFactory.create(), RunnerApi.FunctionSpec.getDefaultInstance(), ImmutableList.of((v1) -> {
            r0.add(v1);
        })).runReadLoop(WindowedValue.valueInGlobalWindow(CountingSource.upTo(0L)));
        Assert.assertThat(arrayList, IsEmptyCollection.empty());
    }

    @Test
    public void testStart() throws Exception {
        ArrayList arrayList = new ArrayList();
        Objects.requireNonNull(arrayList);
        new BoundedSourceRunner(PipelineOptionsFactory.create(), RunnerApi.FunctionSpec.newBuilder().setUrn(URN).setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(CountingSource.upTo(3L)))).build(), ImmutableList.of((v1) -> {
            r0.add(v1);
        })).start();
        Assert.assertThat(arrayList, Matchers.contains(WindowedValue.valueInGlobalWindow(0L), WindowedValue.valueInGlobalWindow(1L), WindowedValue.valueInGlobalWindow(2L)));
    }

    @Test
    public void testCreatingAndProcessingSourceFromFactory() throws Exception {
        ArrayList arrayList = new ArrayList();
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
        Objects.requireNonNull(arrayList);
        pCollectionConsumerRegistry.register("outputPC", FnApiDoFnRunnerTest.TEST_PTRANSFORM_ID, (v1) -> {
            r3.add(v1);
        });
        PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
        PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
        RunnerApi.PTransform build = RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(URN).setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(CountingSource.upTo(3L)))).build()).putInputs("input", "inputPC").putOutputs("output", "outputPC").build();
        BoundedSourceRunner.Factory factory = new BoundedSourceRunner.Factory();
        PipelineOptions create = PipelineOptionsFactory.create();
        Supplier ofInstance = Suppliers.ofInstance("57L");
        Objects.requireNonNull(ofInstance);
        factory.createRunnerForPTransform(create, (BeamFnDataClient) null, (BeamFnStateClient) null, FnApiDoFnRunnerTest.TEST_PTRANSFORM_ID, build, ofInstance::get, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (BundleSplitListener) null);
        ((ThrowingRunnable) Iterables.getOnlyElement(pTransformFunctionRegistry.getFunctions())).run();
        Assert.assertThat(arrayList, Matchers.contains(WindowedValue.valueInGlobalWindow(0L), WindowedValue.valueInGlobalWindow(1L), WindowedValue.valueInGlobalWindow(2L)));
        arrayList.clear();
        Assert.assertThat(pCollectionConsumerRegistry.keySet(), Matchers.containsInAnyOrder("inputPC", "outputPC"));
        pCollectionConsumerRegistry.getMultiplexingConsumer("inputPC").accept(WindowedValue.valueInGlobalWindow(CountingSource.upTo(2L)));
        Assert.assertThat(arrayList, Matchers.contains(WindowedValue.valueInGlobalWindow(0L), WindowedValue.valueInGlobalWindow(1L)));
        Assert.assertThat(pTransformFunctionRegistry2.getFunctions(), Matchers.empty());
    }

    @Test
    public void testRegistration() {
        Iterator it = ServiceLoader.load(PTransformRunnerFactory.Registrar.class).iterator();
        while (it.hasNext()) {
            PTransformRunnerFactory.Registrar registrar = (PTransformRunnerFactory.Registrar) it.next();
            if (registrar instanceof BoundedSourceRunner.Registrar) {
                Assert.assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey(URN));
                return;
            }
        }
        Assert.fail("Expected registrar not found.");
    }
}
