package org.apache.beam.sdk.fn.data;

import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.StringContains;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2Test.class */
public class BeamFnDataGrpcMultiplexer2Test {
    private static final Endpoints.ApiServiceDescriptor DESCRIPTOR = Endpoints.ApiServiceDescriptor.newBuilder().setUrl("test").build();
    private static final String DATA_INSTRUCTION_ID = "dataInstructionId";
    private static final String TIMER_INSTRUCTION_ID = "timerInstructionId";
    private static final BeamFnApi.Elements ELEMENTS = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(DATA_INSTRUCTION_ID).setTransformId("dataTransformId").setData(ByteString.copyFrom(new byte[1]))).addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId(TIMER_INSTRUCTION_ID).setTransformId("timerTransformId").setTimerFamilyId("timerFamilyId").setTimers(ByteString.copyFrom(new byte[2]))).build();
    private static final BeamFnApi.Elements TERMINAL_ELEMENTS = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(DATA_INSTRUCTION_ID).setTransformId("dataTransformId").setIsLast(true)).addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId(TIMER_INSTRUCTION_ID).setTransformId("timerTransformId").setTimerFamilyId("timerFamilyId").setIsLast(true)).build();

    @Rule
    public final TestExecutors.TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool);

    @Test
    public void testOutboundObserver() {
        ArrayList arrayList = new ArrayList();
        new BeamFnDataGrpcMultiplexer2(DESCRIPTOR, OutboundObserverFactory.clientDirect(), streamObserver -> {
            Objects.requireNonNull(arrayList);
            return TestStreams.withOnNext((v1) -> {
                r0.add(v1);
            }).build();
        }).getOutboundObserver().onNext(ELEMENTS);
        MatcherAssert.assertThat(arrayList, Matchers.contains(new BeamFnApi.Elements[]{ELEMENTS}));
    }

    @Test
    public void testInboundObserverBlocksTillConsumerConnects() throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        BeamFnDataGrpcMultiplexer2 beamFnDataGrpcMultiplexer2 = new BeamFnDataGrpcMultiplexer2(DESCRIPTOR, OutboundObserverFactory.clientDirect(), streamObserver -> {
            Objects.requireNonNull(arrayList);
            return TestStreams.withOnNext((v1) -> {
                r0.add(v1);
            }).build();
        });
        Future submit = this.executor.submit(() -> {
            beamFnDataGrpcMultiplexer2.registerConsumer(DATA_INSTRUCTION_ID, new CloseableFnDataReceiver<BeamFnApi.Elements>() { // from class: org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer2Test.1
                public void flush() throws Exception {
                    Assert.fail("Unexpected call");
                }

                public void close() throws Exception {
                    Assert.fail("Unexpected call");
                }

                public void accept(BeamFnApi.Elements elements) throws Exception {
                    arrayList2.add(elements);
                }
            });
            beamFnDataGrpcMultiplexer2.registerConsumer(TIMER_INSTRUCTION_ID, new CloseableFnDataReceiver<BeamFnApi.Elements>() { // from class: org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer2Test.2
                public void flush() throws Exception {
                    Assert.fail("Unexpected call");
                }

                public void close() throws Exception {
                    Assert.fail("Unexpected call");
                }

                public void accept(BeamFnApi.Elements elements) throws Exception {
                    arrayList3.add(elements);
                }
            });
        });
        beamFnDataGrpcMultiplexer2.getInboundObserver().onNext(ELEMENTS);
        Assert.assertTrue(beamFnDataGrpcMultiplexer2.hasConsumer(DATA_INSTRUCTION_ID));
        Assert.assertTrue(beamFnDataGrpcMultiplexer2.hasConsumer(TIMER_INSTRUCTION_ID));
        beamFnDataGrpcMultiplexer2.getInboundObserver().onNext(TERMINAL_ELEMENTS);
        Assert.assertTrue(beamFnDataGrpcMultiplexer2.hasConsumer(DATA_INSTRUCTION_ID));
        Assert.assertTrue(beamFnDataGrpcMultiplexer2.hasConsumer(TIMER_INSTRUCTION_ID));
        submit.get();
        beamFnDataGrpcMultiplexer2.unregisterConsumer(DATA_INSTRUCTION_ID);
        beamFnDataGrpcMultiplexer2.unregisterConsumer(TIMER_INSTRUCTION_ID);
        Assert.assertFalse(beamFnDataGrpcMultiplexer2.hasConsumer(DATA_INSTRUCTION_ID));
        Assert.assertFalse(beamFnDataGrpcMultiplexer2.hasConsumer(TIMER_INSTRUCTION_ID));
        MatcherAssert.assertThat(arrayList2, Matchers.contains(new BeamFnApi.Elements[]{ELEMENTS.toBuilder().clearTimers().build(), TERMINAL_ELEMENTS.toBuilder().clearTimers().build()}));
        MatcherAssert.assertThat(arrayList3, Matchers.contains(new BeamFnApi.Elements[]{ELEMENTS.toBuilder().clearData().build(), TERMINAL_ELEMENTS.toBuilder().clearData().build()}));
    }

    @Test
    public void testElementsNeedsPartitioning() throws Exception {
        ArrayList arrayList = new ArrayList();
        final ArrayList arrayList2 = new ArrayList();
        final ArrayList arrayList3 = new ArrayList();
        BeamFnDataGrpcMultiplexer2 beamFnDataGrpcMultiplexer2 = new BeamFnDataGrpcMultiplexer2(DESCRIPTOR, OutboundObserverFactory.clientDirect(), streamObserver -> {
            Objects.requireNonNull(arrayList);
            return TestStreams.withOnNext((v1) -> {
                r0.add(v1);
            }).build();
        });
        beamFnDataGrpcMultiplexer2.registerConsumer(DATA_INSTRUCTION_ID, new CloseableFnDataReceiver<BeamFnApi.Elements>() { // from class: org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer2Test.3
            public void flush() throws Exception {
                Assert.fail("Unexpected call");
            }

            public void close() throws Exception {
                Assert.fail("Unexpected call");
            }

            public void accept(BeamFnApi.Elements elements) throws Exception {
                arrayList2.add(elements);
            }
        });
        beamFnDataGrpcMultiplexer2.registerConsumer(TIMER_INSTRUCTION_ID, new CloseableFnDataReceiver<BeamFnApi.Elements>() { // from class: org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer2Test.4
            public void flush() throws Exception {
                Assert.fail("Unexpected call");
            }

            public void close() throws Exception {
                Assert.fail("Unexpected call");
            }

            public void accept(BeamFnApi.Elements elements) throws Exception {
                arrayList3.add(elements);
            }
        });
        beamFnDataGrpcMultiplexer2.getInboundObserver().onNext(ELEMENTS);
        beamFnDataGrpcMultiplexer2.getInboundObserver().onNext(TERMINAL_ELEMENTS);
        MatcherAssert.assertThat(arrayList2, Matchers.contains(new BeamFnApi.Elements[]{ELEMENTS.toBuilder().clearTimers().build(), TERMINAL_ELEMENTS.toBuilder().clearTimers().build()}));
        MatcherAssert.assertThat(arrayList3, Matchers.contains(new BeamFnApi.Elements[]{ELEMENTS.toBuilder().clearData().build(), TERMINAL_ELEMENTS.toBuilder().clearData().build()}));
    }

    @Test
    public void testElementsWithOnlySingleInstructionIdUsingHotPath() throws Exception {
        ArrayList arrayList = new ArrayList();
        final ArrayList arrayList2 = new ArrayList();
        BeamFnDataGrpcMultiplexer2 beamFnDataGrpcMultiplexer2 = new BeamFnDataGrpcMultiplexer2(DESCRIPTOR, OutboundObserverFactory.clientDirect(), streamObserver -> {
            Objects.requireNonNull(arrayList);
            return TestStreams.withOnNext((v1) -> {
                r0.add(v1);
            }).build();
        });
        beamFnDataGrpcMultiplexer2.registerConsumer(DATA_INSTRUCTION_ID, new CloseableFnDataReceiver<BeamFnApi.Elements>() { // from class: org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer2Test.5
            public void flush() throws Exception {
                Assert.fail("Unexpected call");
            }

            public void close() throws Exception {
                Assert.fail("Unexpected call");
            }

            public void accept(BeamFnApi.Elements elements) throws Exception {
                arrayList2.add(elements);
            }
        });
        BeamFnApi.Elements build = ELEMENTS.toBuilder().clearTimers().build();
        beamFnDataGrpcMultiplexer2.getInboundObserver().onNext(build);
        Assert.assertSame(Iterables.getOnlyElement(arrayList2), build);
    }

    @Test
    public void testFailedProcessingCausesAdditionalInboundDataToBeIgnored() throws Exception {
        ArrayList arrayList = new ArrayList();
        final ArrayList arrayList2 = new ArrayList();
        BeamFnDataGrpcMultiplexer2 beamFnDataGrpcMultiplexer2 = new BeamFnDataGrpcMultiplexer2(DESCRIPTOR, OutboundObserverFactory.clientDirect(), streamObserver -> {
            Objects.requireNonNull(arrayList);
            return TestStreams.withOnNext((v1) -> {
                r0.add(v1);
            }).build();
        });
        beamFnDataGrpcMultiplexer2.registerConsumer(DATA_INSTRUCTION_ID, new CloseableFnDataReceiver<BeamFnApi.Elements>() { // from class: org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer2Test.6
            public void flush() throws Exception {
                Assert.fail("Unexpected call");
            }

            public void close() throws Exception {
                Assert.fail("Unexpected call");
            }

            public void accept(BeamFnApi.Elements elements) throws Exception {
                if (arrayList2.size() == 1) {
                    throw new Exception("processing failed");
                }
                arrayList2.add(elements);
            }
        });
        BeamFnApi.Elements.Data.Builder instructionId = BeamFnApi.Elements.Data.newBuilder().setInstructionId(DATA_INSTRUCTION_ID);
        beamFnDataGrpcMultiplexer2.getInboundObserver().onNext(BeamFnApi.Elements.newBuilder().addData(instructionId.setTransformId("A").build()).build());
        beamFnDataGrpcMultiplexer2.getInboundObserver().onNext(BeamFnApi.Elements.newBuilder().addData(instructionId.setTransformId("B").build()).build());
        beamFnDataGrpcMultiplexer2.getInboundObserver().onNext(BeamFnApi.Elements.newBuilder().addData(instructionId.setTransformId("C").build()).build());
        MatcherAssert.assertThat(arrayList2, Matchers.contains(new BeamFnApi.Elements[]{BeamFnApi.Elements.newBuilder().addData(instructionId.setTransformId("A").build()).build()}));
    }

    @Test
    public void testClose() throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        BeamFnDataGrpcMultiplexer2 beamFnDataGrpcMultiplexer2 = new BeamFnDataGrpcMultiplexer2(DESCRIPTOR, OutboundObserverFactory.clientDirect(), streamObserver -> {
            Objects.requireNonNull(arrayList);
            TestStreams.Builder withOnNext = TestStreams.withOnNext((v1) -> {
                r0.add(v1);
            });
            Objects.requireNonNull(arrayList2);
            return withOnNext.withOnError((v1) -> {
                r1.add(v1);
            }).build();
        });
        beamFnDataGrpcMultiplexer2.registerConsumer(DATA_INSTRUCTION_ID, new CloseableFnDataReceiver<BeamFnApi.Elements>() { // from class: org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer2Test.7
            public void flush() throws Exception {
                Assert.fail("Unexpected call");
            }

            public void close() throws Exception {
                atomicBoolean.set(true);
            }

            public void accept(BeamFnApi.Elements elements) throws Exception {
                Assert.fail("Unexpected call");
            }
        });
        beamFnDataGrpcMultiplexer2.close();
        Assert.assertTrue(atomicBoolean.get());
        MatcherAssert.assertThat(((Throwable) Iterables.getOnlyElement(arrayList2)).getMessage(), StringContains.containsString("Multiplexer hanging up"));
    }
}
