/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn.data;

import java.util.ArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.StringContains;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class BeamFnDataGrpcMultiplexerTest {
    private static final Endpoints.ApiServiceDescriptor DESCRIPTOR = Endpoints.ApiServiceDescriptor.newBuilder().setUrl("test").build();
    private static final String DATA_INSTRUCTION_ID = "dataInstructionId";
    private static final String TIMER_INSTRUCTION_ID = "timerInstructionId";
    private static final BeamFnApi.Elements ELEMENTS = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId("dataInstructionId").setTransformId("dataTransformId").setData(ByteString.copyFrom((byte[])new byte[1]))).addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId("timerInstructionId").setTransformId("timerTransformId").setTimerFamilyId("timerFamilyId").setTimers(ByteString.copyFrom((byte[])new byte[2]))).build();
    private static final BeamFnApi.Elements TERMINAL_ELEMENTS = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId("dataInstructionId").setTransformId("dataTransformId").setIsLast(true)).addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId("timerInstructionId").setTransformId("timerTransformId").setTimerFamilyId("timerFamilyId").setIsLast(true)).build();
    @Rule
    public final TestExecutors.TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool);

    @Test
    public void testOutboundObserver() {
        ArrayList values = new ArrayList();
        BeamFnDataGrpcMultiplexer multiplexer = new BeamFnDataGrpcMultiplexer(DESCRIPTOR, OutboundObserverFactory.clientDirect(), inboundObserver -> TestStreams.withOnNext(values::add).build());
        multiplexer.getOutboundObserver().onNext((Object)ELEMENTS);
        MatcherAssert.assertThat(values, (Matcher)Matchers.contains((Object[])new BeamFnApi.Elements[]{ELEMENTS}));
    }

    @Test
    public void testInboundObserverBlocksTillConsumerConnects() throws Exception {
        ArrayList outboundValues = new ArrayList();
        final ArrayList dataInboundValues = new ArrayList();
        final ArrayList timerInboundValues = new ArrayList();
        BeamFnDataGrpcMultiplexer multiplexer = new BeamFnDataGrpcMultiplexer(DESCRIPTOR, OutboundObserverFactory.clientDirect(), inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
        Future registerFuture = this.executor.submit(() -> {
            multiplexer.registerConsumer(DATA_INSTRUCTION_ID, (CloseableFnDataReceiver)new CloseableFnDataReceiver<BeamFnApi.Elements>(){

                public void flush() throws Exception {
                    Assert.fail((String)"Unexpected call");
                }

                public void close() throws Exception {
                    Assert.fail((String)"Unexpected call");
                }

                public void accept(BeamFnApi.Elements input) throws Exception {
                    dataInboundValues.add(input);
                }
            });
            multiplexer.registerConsumer(TIMER_INSTRUCTION_ID, (CloseableFnDataReceiver)new CloseableFnDataReceiver<BeamFnApi.Elements>(){

                public void flush() throws Exception {
                    Assert.fail((String)"Unexpected call");
                }

                public void close() throws Exception {
                    Assert.fail((String)"Unexpected call");
                }

                public void accept(BeamFnApi.Elements input) throws Exception {
                    timerInboundValues.add(input);
                }
            });
        });
        multiplexer.getInboundObserver().onNext((Object)ELEMENTS);
        Assert.assertTrue((boolean)multiplexer.hasConsumer(DATA_INSTRUCTION_ID));
        Assert.assertTrue((boolean)multiplexer.hasConsumer(TIMER_INSTRUCTION_ID));
        multiplexer.getInboundObserver().onNext((Object)TERMINAL_ELEMENTS);
        Assert.assertTrue((boolean)multiplexer.hasConsumer(DATA_INSTRUCTION_ID));
        Assert.assertTrue((boolean)multiplexer.hasConsumer(TIMER_INSTRUCTION_ID));
        registerFuture.get();
        multiplexer.unregisterConsumer(DATA_INSTRUCTION_ID);
        multiplexer.unregisterConsumer(TIMER_INSTRUCTION_ID);
        Assert.assertFalse((boolean)multiplexer.hasConsumer(DATA_INSTRUCTION_ID));
        Assert.assertFalse((boolean)multiplexer.hasConsumer(TIMER_INSTRUCTION_ID));
        MatcherAssert.assertThat(dataInboundValues, (Matcher)Matchers.contains((Object[])new BeamFnApi.Elements[]{ELEMENTS.toBuilder().clearTimers().build(), TERMINAL_ELEMENTS.toBuilder().clearTimers().build()}));
        MatcherAssert.assertThat(timerInboundValues, (Matcher)Matchers.contains((Object[])new BeamFnApi.Elements[]{ELEMENTS.toBuilder().clearData().build(), TERMINAL_ELEMENTS.toBuilder().clearData().build()}));
    }

    @Test
    public void testElementsNeedsPartitioning() throws Exception {
        ArrayList outboundValues = new ArrayList();
        final ArrayList dataInboundValues = new ArrayList();
        final ArrayList timerInboundValues = new ArrayList();
        BeamFnDataGrpcMultiplexer multiplexer = new BeamFnDataGrpcMultiplexer(DESCRIPTOR, OutboundObserverFactory.clientDirect(), inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
        multiplexer.registerConsumer(DATA_INSTRUCTION_ID, (CloseableFnDataReceiver)new CloseableFnDataReceiver<BeamFnApi.Elements>(){

            public void flush() throws Exception {
                Assert.fail((String)"Unexpected call");
            }

            public void close() throws Exception {
                Assert.fail((String)"Unexpected call");
            }

            public void accept(BeamFnApi.Elements input) throws Exception {
                dataInboundValues.add(input);
            }
        });
        multiplexer.registerConsumer(TIMER_INSTRUCTION_ID, (CloseableFnDataReceiver)new CloseableFnDataReceiver<BeamFnApi.Elements>(){

            public void flush() throws Exception {
                Assert.fail((String)"Unexpected call");
            }

            public void close() throws Exception {
                Assert.fail((String)"Unexpected call");
            }

            public void accept(BeamFnApi.Elements input) throws Exception {
                timerInboundValues.add(input);
            }
        });
        multiplexer.getInboundObserver().onNext((Object)ELEMENTS);
        multiplexer.getInboundObserver().onNext((Object)TERMINAL_ELEMENTS);
        MatcherAssert.assertThat(dataInboundValues, (Matcher)Matchers.contains((Object[])new BeamFnApi.Elements[]{ELEMENTS.toBuilder().clearTimers().build(), TERMINAL_ELEMENTS.toBuilder().clearTimers().build()}));
        MatcherAssert.assertThat(timerInboundValues, (Matcher)Matchers.contains((Object[])new BeamFnApi.Elements[]{ELEMENTS.toBuilder().clearData().build(), TERMINAL_ELEMENTS.toBuilder().clearData().build()}));
    }

    @Test
    public void testElementsWithOnlySingleInstructionIdUsingHotPath() throws Exception {
        ArrayList outboundValues = new ArrayList();
        final ArrayList dataInboundValues = new ArrayList();
        BeamFnDataGrpcMultiplexer multiplexer = new BeamFnDataGrpcMultiplexer(DESCRIPTOR, OutboundObserverFactory.clientDirect(), inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
        multiplexer.registerConsumer(DATA_INSTRUCTION_ID, (CloseableFnDataReceiver)new CloseableFnDataReceiver<BeamFnApi.Elements>(){

            public void flush() throws Exception {
                Assert.fail((String)"Unexpected call");
            }

            public void close() throws Exception {
                Assert.fail((String)"Unexpected call");
            }

            public void accept(BeamFnApi.Elements input) throws Exception {
                dataInboundValues.add(input);
            }
        });
        BeamFnApi.Elements value = ELEMENTS.toBuilder().clearTimers().build();
        multiplexer.getInboundObserver().onNext((Object)value);
        Assert.assertSame((Object)Iterables.getOnlyElement(dataInboundValues), (Object)value);
    }

    @Test
    public void testFailedProcessingCausesAdditionalInboundDataToBeIgnored() throws Exception {
        ArrayList outboundValues = new ArrayList();
        final ArrayList dataInboundValues = new ArrayList();
        BeamFnDataGrpcMultiplexer multiplexer = new BeamFnDataGrpcMultiplexer(DESCRIPTOR, OutboundObserverFactory.clientDirect(), inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
        multiplexer.registerConsumer(DATA_INSTRUCTION_ID, (CloseableFnDataReceiver)new CloseableFnDataReceiver<BeamFnApi.Elements>(){

            public void flush() throws Exception {
                Assert.fail((String)"Unexpected call");
            }

            public void close() throws Exception {
                Assert.fail((String)"Unexpected call");
            }

            public void accept(BeamFnApi.Elements input) throws Exception {
                if (dataInboundValues.size() == 1) {
                    throw new Exception("processing failed");
                }
                dataInboundValues.add(input);
            }
        });
        BeamFnApi.Elements.Data.Builder data = BeamFnApi.Elements.Data.newBuilder().setInstructionId(DATA_INSTRUCTION_ID);
        multiplexer.getInboundObserver().onNext((Object)BeamFnApi.Elements.newBuilder().addData(data.setTransformId("A").build()).build());
        multiplexer.getInboundObserver().onNext((Object)BeamFnApi.Elements.newBuilder().addData(data.setTransformId("B").build()).build());
        multiplexer.getInboundObserver().onNext((Object)BeamFnApi.Elements.newBuilder().addData(data.setTransformId("C").build()).build());
        MatcherAssert.assertThat(dataInboundValues, (Matcher)Matchers.contains((Object[])new BeamFnApi.Elements[]{BeamFnApi.Elements.newBuilder().addData(data.setTransformId("A").build()).build()}));
    }

    @Test
    public void testClose() throws Exception {
        ArrayList outboundValues = new ArrayList();
        ArrayList errorWasReturned = new ArrayList();
        final AtomicBoolean wasClosed = new AtomicBoolean();
        BeamFnDataGrpcMultiplexer multiplexer = new BeamFnDataGrpcMultiplexer(DESCRIPTOR, OutboundObserverFactory.clientDirect(), inboundObserver -> TestStreams.withOnNext(outboundValues::add).withOnError(errorWasReturned::add).build());
        multiplexer.registerConsumer(DATA_INSTRUCTION_ID, (CloseableFnDataReceiver)new CloseableFnDataReceiver<BeamFnApi.Elements>(){

            public void flush() throws Exception {
                Assert.fail((String)"Unexpected call");
            }

            public void close() throws Exception {
                wasClosed.set(true);
            }

            public void accept(BeamFnApi.Elements input) throws Exception {
                Assert.fail((String)"Unexpected call");
            }
        });
        multiplexer.close();
        Assert.assertTrue((boolean)wasClosed.get());
        MatcherAssert.assertThat((Object)((Throwable)Iterables.getOnlyElement(errorWasReturned)).getMessage(), (Matcher)StringContains.containsString((String)"Multiplexer hanging up"));
    }
}

