/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn.data;

import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class BeamFnDataGrpcMultiplexerTest {
    private static final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor DESCRIPTOR = Endpoints.ApiServiceDescriptor.newBuilder().setUrl("test").build();
    private static final @UnknownKeyFor @NonNull @Initialized LogicalEndpoint DATA_LOCATION = LogicalEndpoint.data((String)"777L", (String)"888L");
    private static final @UnknownKeyFor @NonNull @Initialized LogicalEndpoint TIMER_LOCATION = LogicalEndpoint.timer((String)"999L", (String)"555L", (String)"333L");
    private static final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.Elements ELEMENTS = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(DATA_LOCATION.getInstructionId()).setTransformId(DATA_LOCATION.getTransformId()).setData(ByteString.copyFrom((byte[])new byte[1]))).addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId(TIMER_LOCATION.getInstructionId()).setTransformId(TIMER_LOCATION.getTransformId()).setTimerFamilyId(TIMER_LOCATION.getTimerFamilyId()).setTimers(ByteString.copyFrom((byte[])new byte[2]))).build();
    private static final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.Elements TERMINAL_ELEMENTS = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(DATA_LOCATION.getInstructionId()).setTransformId(DATA_LOCATION.getTransformId()).setIsLast(true)).addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId(TIMER_LOCATION.getInstructionId()).setTransformId(TIMER_LOCATION.getTransformId()).setTimerFamilyId(TIMER_LOCATION.getTimerFamilyId()).setIsLast(true)).build();

    @Test
    public void testOutboundObserver() {
        ArrayList values = new ArrayList();
        BeamFnDataGrpcMultiplexer multiplexer = new BeamFnDataGrpcMultiplexer(DESCRIPTOR, OutboundObserverFactory.clientDirect(), inboundObserver -> TestStreams.withOnNext(values::add).build());
        multiplexer.getOutboundObserver().onNext((Object)ELEMENTS);
        Assert.assertThat(values, (Matcher)Matchers.contains((Object[])new BeamFnApi.Elements[]{ELEMENTS}));
    }

    @Test
    public void testInboundObserverBlocksTillConsumerConnects() throws @UnknownKeyFor @NonNull @Initialized Exception {
        ArrayList outboundValues = new ArrayList();
        ArrayList dataInboundValues = new ArrayList();
        ArrayList timerInboundValues = new ArrayList();
        BeamFnDataGrpcMultiplexer multiplexer = new BeamFnDataGrpcMultiplexer(DESCRIPTOR, OutboundObserverFactory.clientDirect(), inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.submit(() -> {
            Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
            multiplexer.registerConsumer(DATA_LOCATION, (payload, isLast) -> dataInboundValues.add(KV.of((Object)payload, (Object)isLast)));
            multiplexer.registerConsumer(TIMER_LOCATION, (payload, isLast) -> timerInboundValues.add(KV.of((Object)payload, (Object)isLast)));
        }).get();
        multiplexer.getInboundObserver().onNext((Object)ELEMENTS);
        Assert.assertTrue((boolean)multiplexer.hasConsumer(DATA_LOCATION));
        Assert.assertTrue((boolean)multiplexer.hasConsumer(TIMER_LOCATION));
        multiplexer.getInboundObserver().onNext((Object)TERMINAL_ELEMENTS);
        Assert.assertFalse((boolean)multiplexer.hasConsumer(DATA_LOCATION));
        Assert.assertFalse((boolean)multiplexer.hasConsumer(TIMER_LOCATION));
        Assert.assertThat(dataInboundValues, (Matcher)Matchers.contains((Object[])new KV[]{KV.of((Object)ELEMENTS.getData(0).getData(), (Object)false), KV.of((Object)ByteString.EMPTY, (Object)true)}));
        Assert.assertThat(timerInboundValues, (Matcher)Matchers.contains((Object[])new KV[]{KV.of((Object)ELEMENTS.getTimers(0).getTimers(), (Object)false), KV.of((Object)ByteString.EMPTY, (Object)true)}));
    }
}

