package org.apache.beam.fn.harness;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.BeamFnDataWriteRunner;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.PTransformRunnerFactoryTestContext;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsMapContaining;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.class */
public class BeamFnDataWriteRunnerTest {
    private static final String ELEM_CODER_ID = "string-coder-id";
    private static final RunnerApi.Coder WIRE_CODER_SPEC;
    private static final RunnerApi.Components COMPONENTS;
    private static final String TRANSFORM_ID = "1";

    @Mock
    private BeamFnDataClient mockBeamFnDataClient;
    private static final Coder<String> ELEM_CODER = StringUtf8Coder.of();
    private static final Coder<WindowedValue<String>> WIRE_CODER = WindowedValue.getFullCoder(ELEM_CODER, GlobalWindow.Coder.INSTANCE);
    private static final String WIRE_CODER_ID = "windowed-string-coder-id";
    private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(Endpoints.ApiServiceDescriptor.getDefaultInstance()).setCoderId(WIRE_CODER_ID).build();

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
    }

    private BeamFnDataOutboundAggregator createRecordingAggregator(final Map<String, List<WindowedValue<String>>> map, final Supplier<String> supplier) {
        PipelineOptions create = PipelineOptionsFactory.create();
        create.as(ExperimentalOptions.class).setExperiments(Arrays.asList("data_buffer_size_limit=0"));
        return new BeamFnDataOutboundAggregator(create, supplier, new StreamObserver<BeamFnApi.Elements>() { // from class: org.apache.beam.fn.harness.BeamFnDataWriteRunnerTest.1
            public void onNext(BeamFnApi.Elements elements) {
                Iterator<BeamFnApi.Elements.Data> it = elements.getDataList().iterator();
                while (it.hasNext()) {
                    try {
                        ((List) map.get(supplier.get())).add((WindowedValue) BeamFnDataWriteRunnerTest.WIRE_CODER.decode(it.next().getData().newInput()));
                    } catch (IOException e) {
                        throw new RuntimeException("Failed to decode output.");
                    }
                }
            }

            public void onError(Throwable th) {
            }

            public void onCompleted() {
            }
        }, false);
    }

    @Test
    public void testReuseForMultipleBundles() throws Exception {
        AtomicReference atomicReference = new AtomicReference("0");
        RunnerApi.PTransform pTransform = RemoteGrpcPortWrite.writeToPort("inputPC", PORT_SPEC).toPTransform();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        HashMap hashMap = new HashMap();
        ImmutableMap of = ImmutableMap.of("0", arrayList, TRANSFORM_ID, arrayList2);
        Objects.requireNonNull(atomicReference);
        hashMap.put(PORT_SPEC.getApiServiceDescriptor(), createRecordingAggregator(of, atomicReference::get));
        PTransformRunnerFactoryTestContext.Builder beamFnDataClient = PTransformRunnerFactoryTestContext.builder(TRANSFORM_ID, pTransform).beamFnDataClient(this.mockBeamFnDataClient);
        Objects.requireNonNull(atomicReference);
        PTransformRunnerFactoryTestContext build = beamFnDataClient.processBundleInstructionIdSupplier(atomicReference::get).outboundAggregators(hashMap).pCollections(ImmutableMap.of("inputPC", RunnerApi.PCollection.newBuilder().setCoderId(ELEM_CODER_ID).build())).coders(COMPONENTS.getCodersMap()).windowingStrategies(COMPONENTS.getWindowingStrategiesMap()).build();
        new BeamFnDataWriteRunner.Factory().createRunnerForPTransform(build);
        MatcherAssert.assertThat(build.getPCollectionConsumers().keySet(), (Matcher<? super Set<String>>) Matchers.containsInAnyOrder("inputPC"));
        FnDataReceiver pCollectionConsumer = build.getPCollectionConsumer("inputPC");
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow("ABC"));
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow("DEF"));
        MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.valueInGlobalWindow("ABC"), WindowedValue.valueInGlobalWindow("DEF")));
        arrayList.clear();
        atomicReference.set(TRANSFORM_ID);
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow("GHI"));
        pCollectionConsumer.accept(WindowedValue.valueInGlobalWindow("JKL"));
        MatcherAssert.assertThat(arrayList2, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.valueInGlobalWindow("GHI"), WindowedValue.valueInGlobalWindow("JKL")));
        Mockito.verifyNoMoreInteractions(this.mockBeamFnDataClient);
    }

    @Test
    public void testRegistration() {
        Iterator it = ServiceLoader.load(PTransformRunnerFactory.Registrar.class).iterator();
        while (it.hasNext()) {
            PTransformRunnerFactory.Registrar registrar = (PTransformRunnerFactory.Registrar) it.next();
            if (registrar instanceof BeamFnDataWriteRunner.Registrar) {
                MatcherAssert.assertThat(registrar.getPTransformRunnerFactories(), (Matcher<? super Map>) IsMapContaining.hasKey(RemoteGrpcPortWrite.URN));
                return;
            }
        }
        Assert.fail("Expected registrar not found.");
    }

    static {
        try {
            RunnerApi.MessageWithComponents proto = CoderTranslation.toProto(WIRE_CODER);
            WIRE_CODER_SPEC = proto.getCoder();
            COMPONENTS = proto.getComponents().toBuilder().putCoders(WIRE_CODER_ID, WIRE_CODER_SPEC).putCoders(ELEM_CODER_ID, CoderTranslation.toProto(ELEM_CODER).getCoder()).build();
        } catch (IOException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
}
