package org.apache.beam.sdk.fn.data;

import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexerTest.class */
public class BeamFnDataGrpcMultiplexerTest {
    private static final Endpoints.ApiServiceDescriptor DESCRIPTOR = Endpoints.ApiServiceDescriptor.newBuilder().setUrl("test").build();
    private static final LogicalEndpoint DATA_LOCATION = LogicalEndpoint.data("777L", "888L");
    private static final LogicalEndpoint TIMER_LOCATION = LogicalEndpoint.timer("999L", "555L", "333L");
    private static final BeamFnApi.Elements ELEMENTS = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(DATA_LOCATION.getInstructionId()).setTransformId(DATA_LOCATION.getTransformId()).setData(ByteString.copyFrom(new byte[1]))).addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId(TIMER_LOCATION.getInstructionId()).setTransformId(TIMER_LOCATION.getTransformId()).setTimerFamilyId(TIMER_LOCATION.getTimerFamilyId()).setTimers(ByteString.copyFrom(new byte[2]))).m675build();
    private static final BeamFnApi.Elements TERMINAL_ELEMENTS = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(DATA_LOCATION.getInstructionId()).setTransformId(DATA_LOCATION.getTransformId()).setIsLast(true)).addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId(TIMER_LOCATION.getInstructionId()).setTransformId(TIMER_LOCATION.getTransformId()).setTimerFamilyId(TIMER_LOCATION.getTimerFamilyId()).setIsLast(true)).m675build();

    @Test
    public void testOutboundObserver() {
        ArrayList arrayList = new ArrayList();
        new BeamFnDataGrpcMultiplexer(DESCRIPTOR, OutboundObserverFactory.clientDirect(), streamObserver -> {
            Objects.requireNonNull(arrayList);
            return TestStreams.withOnNext((v1) -> {
                r0.add(v1);
            }).build();
        }).getOutboundObserver().onNext(ELEMENTS);
        MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(ELEMENTS));
    }

    @Test
    public void testInboundObserverBlocksTillConsumerConnects() throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        BeamFnDataGrpcMultiplexer beamFnDataGrpcMultiplexer = new BeamFnDataGrpcMultiplexer(DESCRIPTOR, OutboundObserverFactory.clientDirect(), streamObserver -> {
            Objects.requireNonNull(arrayList);
            return TestStreams.withOnNext((v1) -> {
                r0.add(v1);
            }).build();
        });
        Executors.newCachedThreadPool().submit(() -> {
            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
            beamFnDataGrpcMultiplexer.registerConsumer(DATA_LOCATION, (byteString, bool) -> {
                arrayList2.add(KV.of(byteString, bool));
            });
            beamFnDataGrpcMultiplexer.registerConsumer(TIMER_LOCATION, (byteString2, bool2) -> {
                arrayList3.add(KV.of(byteString2, bool2));
            });
        }).get();
        beamFnDataGrpcMultiplexer.getInboundObserver().onNext(ELEMENTS);
        Assert.assertTrue(beamFnDataGrpcMultiplexer.hasConsumer(DATA_LOCATION));
        Assert.assertTrue(beamFnDataGrpcMultiplexer.hasConsumer(TIMER_LOCATION));
        beamFnDataGrpcMultiplexer.getInboundObserver().onNext(TERMINAL_ELEMENTS);
        Assert.assertFalse(beamFnDataGrpcMultiplexer.hasConsumer(DATA_LOCATION));
        Assert.assertFalse(beamFnDataGrpcMultiplexer.hasConsumer(TIMER_LOCATION));
        MatcherAssert.assertThat(arrayList2, (Matcher<? super ArrayList>) Matchers.contains(KV.of(ELEMENTS.getData(0).getData(), false), KV.of(ByteString.EMPTY, true)));
        MatcherAssert.assertThat(arrayList3, (Matcher<? super ArrayList>) Matchers.contains(KV.of(ELEMENTS.getTimers(0).getTimers(), false), KV.of(ByteString.EMPTY, true)));
    }
}
