/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn.data;

import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class BeamFnDataGrpcMultiplexerTest {
    private static final Endpoints.ApiServiceDescriptor DESCRIPTOR = Endpoints.ApiServiceDescriptor.newBuilder().setUrl("test").build();
    private static final LogicalEndpoint OUTPUT_LOCATION = LogicalEndpoint.of((String)"777L", (String)"888L");
    private static final BeamFnApi.Elements ELEMENTS = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(OUTPUT_LOCATION.getInstructionId()).setTransformId(OUTPUT_LOCATION.getTransformId()).setData(ByteString.copyFrom((byte[])new byte[1]))).build();
    private static final BeamFnApi.Elements TERMINAL_ELEMENTS = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(OUTPUT_LOCATION.getInstructionId()).setTransformId(OUTPUT_LOCATION.getTransformId())).build();

    @Test
    public void testOutboundObserver() {
        ArrayList values = new ArrayList();
        BeamFnDataGrpcMultiplexer multiplexer = new BeamFnDataGrpcMultiplexer(DESCRIPTOR, OutboundObserverFactory.clientDirect(), inboundObserver -> TestStreams.withOnNext(values::add).build());
        multiplexer.getOutboundObserver().onNext((Object)ELEMENTS);
        Assert.assertThat(values, (Matcher)Matchers.contains((Object[])new BeamFnApi.Elements[]{ELEMENTS}));
    }

    @Test
    public void testInboundObserverBlocksTillConsumerConnects() throws Exception {
        ArrayList outboundValues = new ArrayList();
        ArrayList inboundValues = new ArrayList();
        BeamFnDataGrpcMultiplexer multiplexer = new BeamFnDataGrpcMultiplexer(DESCRIPTOR, OutboundObserverFactory.clientDirect(), inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.submit(() -> {
            Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
            multiplexer.registerConsumer(OUTPUT_LOCATION, inboundValues::add);
        }).get();
        multiplexer.getInboundObserver().onNext((Object)ELEMENTS);
        Assert.assertTrue((boolean)multiplexer.hasConsumer(OUTPUT_LOCATION));
        multiplexer.getInboundObserver().onNext((Object)TERMINAL_ELEMENTS);
        Assert.assertFalse((boolean)multiplexer.hasConsumer(OUTPUT_LOCATION));
        Assert.assertThat(inboundValues, (Matcher)Matchers.contains((Object[])new BeamFnApi.Elements.Data[]{ELEMENTS.getData(0), TERMINAL_ELEMENTS.getData(0)}));
    }
}

