package org.apache.beam.fn.harness.data;

import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.fn.harness.test.TestStreams;
import org.apache.beam.fn.v1.BeamFnApi;
import org.apache.beam.sdk.values.KV;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.class */
public class BeamFnDataGrpcMultiplexerTest {
    private static final BeamFnApi.ApiServiceDescriptor DESCRIPTOR = BeamFnApi.ApiServiceDescriptor.newBuilder().setUrl("test").build();
    private static final KV<String, BeamFnApi.Target> OUTPUT_LOCATION = KV.of("777L", BeamFnApi.Target.newBuilder().setName("name").setPrimitiveTransformReference("888L").build());
    private static final BeamFnApi.Elements ELEMENTS = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference((String) OUTPUT_LOCATION.getKey()).setTarget((BeamFnApi.Target) OUTPUT_LOCATION.getValue()).setData(ByteString.copyFrom(new byte[1]))).build();
    private static final BeamFnApi.Elements TERMINAL_ELEMENTS = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference((String) OUTPUT_LOCATION.getKey()).setTarget((BeamFnApi.Target) OUTPUT_LOCATION.getValue())).build();

    @Test
    public void testOutboundObserver() {
        ArrayList arrayList = new ArrayList();
        new BeamFnDataGrpcMultiplexer(DESCRIPTOR, streamObserver -> {
            arrayList.getClass();
            return TestStreams.withOnNext((v1) -> {
                r0.add(v1);
            }).build();
        }).getOutboundObserver().onNext(ELEMENTS);
        Assert.assertThat(arrayList, Matchers.contains(new BeamFnApi.Elements[]{ELEMENTS}));
    }

    @Test
    public void testInboundObserverBlocksTillConsumerConnects() throws Exception {
        ArrayList arrayList = new ArrayList();
        final ArrayList arrayList2 = new ArrayList();
        final BeamFnDataGrpcMultiplexer beamFnDataGrpcMultiplexer = new BeamFnDataGrpcMultiplexer(DESCRIPTOR, streamObserver -> {
            arrayList.getClass();
            return TestStreams.withOnNext((v1) -> {
                r0.add(v1);
            }).build();
        });
        Executors.newCachedThreadPool().submit(new Runnable() { // from class: org.apache.beam.fn.harness.data.BeamFnDataGrpcMultiplexerTest.1
            @Override // java.lang.Runnable
            public void run() {
                Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
                CompletableFuture futureForKey = beamFnDataGrpcMultiplexer.futureForKey(BeamFnDataGrpcMultiplexerTest.OUTPUT_LOCATION);
                Collection collection = arrayList2;
                collection.getClass();
                futureForKey.complete((v1) -> {
                    r1.add(v1);
                });
            }
        });
        beamFnDataGrpcMultiplexer.getInboundObserver().onNext(ELEMENTS);
        Assert.assertTrue(beamFnDataGrpcMultiplexer.consumers.containsKey(OUTPUT_LOCATION));
        beamFnDataGrpcMultiplexer.getInboundObserver().onNext(TERMINAL_ELEMENTS);
        Assert.assertFalse(beamFnDataGrpcMultiplexer.consumers.containsKey(OUTPUT_LOCATION));
        Assert.assertThat(arrayList2, Matchers.contains(new BeamFnApi.Elements.Data[]{ELEMENTS.getData(0), TERMINAL_ELEMENTS.getData(0)}));
    }
}
