package org.apache.beam.sdk.fn.data;

import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.StringContains;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexerTest.class */
public class BeamFnDataGrpcMultiplexerTest {
    private static final Endpoints.ApiServiceDescriptor DESCRIPTOR = Endpoints.ApiServiceDescriptor.newBuilder().setUrl("test").build();
    private static final String DATA_INSTRUCTION_ID = "dataInstructionId";
    private static final String TIMER_INSTRUCTION_ID = "timerInstructionId";
    private static final BeamFnApi.Elements ELEMENTS = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(DATA_INSTRUCTION_ID).setTransformId("dataTransformId").setData(ByteString.copyFrom(new byte[1]))).addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId(TIMER_INSTRUCTION_ID).setTransformId("timerTransformId").setTimerFamilyId("timerFamilyId").setTimers(ByteString.copyFrom(new byte[2]))).m701build();
    private static final BeamFnApi.Elements TERMINAL_ELEMENTS = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(DATA_INSTRUCTION_ID).setTransformId("dataTransformId").setIsLast(true)).addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId(TIMER_INSTRUCTION_ID).setTransformId("timerTransformId").setTimerFamilyId("timerFamilyId").setIsLast(true)).m701build();

    @Rule
    public final TestExecutors.TestExecutorService executor = TestExecutors.from((Supplier<ExecutorService>) Executors::newCachedThreadPool);

    @Test
    public void testOutboundObserver() {
        ArrayList arrayList = new ArrayList();
        new BeamFnDataGrpcMultiplexer(DESCRIPTOR, OutboundObserverFactory.clientDirect(), streamObserver -> {
            Objects.requireNonNull(arrayList);
            return TestStreams.withOnNext((v1) -> {
                r0.add(v1);
            }).build();
        }).getOutboundObserver().onNext(ELEMENTS);
        MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(ELEMENTS));
    }

    @Test
    public void testInboundObserverBlocksTillConsumerConnects() throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        BeamFnDataGrpcMultiplexer beamFnDataGrpcMultiplexer = new BeamFnDataGrpcMultiplexer(DESCRIPTOR, OutboundObserverFactory.clientDirect(), streamObserver -> {
            Objects.requireNonNull(arrayList);
            return TestStreams.withOnNext((v1) -> {
                r0.add(v1);
            }).build();
        });
        Future<?> submit = this.executor.submit(() -> {
            beamFnDataGrpcMultiplexer.registerConsumer(DATA_INSTRUCTION_ID, new CloseableFnDataReceiver<BeamFnApi.Elements>() { // from class: org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexerTest.1
                @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver
                public void flush() throws Exception {
                    Assert.fail("Unexpected call");
                }

                @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver, java.lang.AutoCloseable
                public void close() throws Exception {
                    Assert.fail("Unexpected call");
                }

                @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
                public void accept(BeamFnApi.Elements elements) throws Exception {
                    arrayList2.add(elements);
                }
            });
            beamFnDataGrpcMultiplexer.registerConsumer(TIMER_INSTRUCTION_ID, new CloseableFnDataReceiver<BeamFnApi.Elements>() { // from class: org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexerTest.2
                @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver
                public void flush() throws Exception {
                    Assert.fail("Unexpected call");
                }

                @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver, java.lang.AutoCloseable
                public void close() throws Exception {
                    Assert.fail("Unexpected call");
                }

                @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
                public void accept(BeamFnApi.Elements elements) throws Exception {
                    arrayList3.add(elements);
                }
            });
        });
        beamFnDataGrpcMultiplexer.getInboundObserver().onNext(ELEMENTS);
        Assert.assertTrue(beamFnDataGrpcMultiplexer.hasConsumer(DATA_INSTRUCTION_ID));
        Assert.assertTrue(beamFnDataGrpcMultiplexer.hasConsumer(TIMER_INSTRUCTION_ID));
        beamFnDataGrpcMultiplexer.getInboundObserver().onNext(TERMINAL_ELEMENTS);
        Assert.assertTrue(beamFnDataGrpcMultiplexer.hasConsumer(DATA_INSTRUCTION_ID));
        Assert.assertTrue(beamFnDataGrpcMultiplexer.hasConsumer(TIMER_INSTRUCTION_ID));
        submit.get();
        beamFnDataGrpcMultiplexer.unregisterConsumer(DATA_INSTRUCTION_ID);
        beamFnDataGrpcMultiplexer.unregisterConsumer(TIMER_INSTRUCTION_ID);
        Assert.assertFalse(beamFnDataGrpcMultiplexer.hasConsumer(DATA_INSTRUCTION_ID));
        Assert.assertFalse(beamFnDataGrpcMultiplexer.hasConsumer(TIMER_INSTRUCTION_ID));
        MatcherAssert.assertThat(arrayList2, (Matcher<? super ArrayList>) Matchers.contains(ELEMENTS.m665toBuilder().clearTimers().m701build(), TERMINAL_ELEMENTS.m665toBuilder().clearTimers().m701build()));
        MatcherAssert.assertThat(arrayList3, (Matcher<? super ArrayList>) Matchers.contains(ELEMENTS.m665toBuilder().clearData().m701build(), TERMINAL_ELEMENTS.m665toBuilder().clearData().m701build()));
    }

    @Test
    public void testElementsNeedsPartitioning() throws Exception {
        ArrayList arrayList = new ArrayList();
        final ArrayList arrayList2 = new ArrayList();
        final ArrayList arrayList3 = new ArrayList();
        BeamFnDataGrpcMultiplexer beamFnDataGrpcMultiplexer = new BeamFnDataGrpcMultiplexer(DESCRIPTOR, OutboundObserverFactory.clientDirect(), streamObserver -> {
            Objects.requireNonNull(arrayList);
            return TestStreams.withOnNext((v1) -> {
                r0.add(v1);
            }).build();
        });
        beamFnDataGrpcMultiplexer.registerConsumer(DATA_INSTRUCTION_ID, new CloseableFnDataReceiver<BeamFnApi.Elements>() { // from class: org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexerTest.3
            @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver
            public void flush() throws Exception {
                Assert.fail("Unexpected call");
            }

            @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver, java.lang.AutoCloseable
            public void close() throws Exception {
                Assert.fail("Unexpected call");
            }

            @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
            public void accept(BeamFnApi.Elements elements) throws Exception {
                arrayList2.add(elements);
            }
        });
        beamFnDataGrpcMultiplexer.registerConsumer(TIMER_INSTRUCTION_ID, new CloseableFnDataReceiver<BeamFnApi.Elements>() { // from class: org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexerTest.4
            @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver
            public void flush() throws Exception {
                Assert.fail("Unexpected call");
            }

            @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver, java.lang.AutoCloseable
            public void close() throws Exception {
                Assert.fail("Unexpected call");
            }

            @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
            public void accept(BeamFnApi.Elements elements) throws Exception {
                arrayList3.add(elements);
            }
        });
        beamFnDataGrpcMultiplexer.getInboundObserver().onNext(ELEMENTS);
        beamFnDataGrpcMultiplexer.getInboundObserver().onNext(TERMINAL_ELEMENTS);
        MatcherAssert.assertThat(arrayList2, (Matcher<? super ArrayList>) Matchers.contains(ELEMENTS.m665toBuilder().clearTimers().m701build(), TERMINAL_ELEMENTS.m665toBuilder().clearTimers().m701build()));
        MatcherAssert.assertThat(arrayList3, (Matcher<? super ArrayList>) Matchers.contains(ELEMENTS.m665toBuilder().clearData().m701build(), TERMINAL_ELEMENTS.m665toBuilder().clearData().m701build()));
    }

    @Test
    public void testElementsWithOnlySingleInstructionIdUsingHotPath() throws Exception {
        ArrayList arrayList = new ArrayList();
        final ArrayList arrayList2 = new ArrayList();
        BeamFnDataGrpcMultiplexer beamFnDataGrpcMultiplexer = new BeamFnDataGrpcMultiplexer(DESCRIPTOR, OutboundObserverFactory.clientDirect(), streamObserver -> {
            Objects.requireNonNull(arrayList);
            return TestStreams.withOnNext((v1) -> {
                r0.add(v1);
            }).build();
        });
        beamFnDataGrpcMultiplexer.registerConsumer(DATA_INSTRUCTION_ID, new CloseableFnDataReceiver<BeamFnApi.Elements>() { // from class: org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexerTest.5
            @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver
            public void flush() throws Exception {
                Assert.fail("Unexpected call");
            }

            @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver, java.lang.AutoCloseable
            public void close() throws Exception {
                Assert.fail("Unexpected call");
            }

            @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
            public void accept(BeamFnApi.Elements elements) throws Exception {
                arrayList2.add(elements);
            }
        });
        BeamFnApi.Elements m701build = ELEMENTS.m665toBuilder().clearTimers().m701build();
        beamFnDataGrpcMultiplexer.getInboundObserver().onNext(m701build);
        Assert.assertSame(Iterables.getOnlyElement(arrayList2), m701build);
    }

    @Test
    public void testFailedProcessingCausesAdditionalInboundDataToBeIgnored() throws Exception {
        ArrayList arrayList = new ArrayList();
        final ArrayList arrayList2 = new ArrayList();
        BeamFnDataGrpcMultiplexer beamFnDataGrpcMultiplexer = new BeamFnDataGrpcMultiplexer(DESCRIPTOR, OutboundObserverFactory.clientDirect(), streamObserver -> {
            Objects.requireNonNull(arrayList);
            return TestStreams.withOnNext((v1) -> {
                r0.add(v1);
            }).build();
        });
        beamFnDataGrpcMultiplexer.registerConsumer(DATA_INSTRUCTION_ID, new CloseableFnDataReceiver<BeamFnApi.Elements>() { // from class: org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexerTest.6
            @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver
            public void flush() throws Exception {
                Assert.fail("Unexpected call");
            }

            @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver, java.lang.AutoCloseable
            public void close() throws Exception {
                Assert.fail("Unexpected call");
            }

            @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
            public void accept(BeamFnApi.Elements elements) throws Exception {
                if (arrayList2.size() == 1) {
                    throw new Exception("processing failed");
                }
                arrayList2.add(elements);
            }
        });
        BeamFnApi.Elements.Data.Builder instructionId = BeamFnApi.Elements.Data.newBuilder().setInstructionId(DATA_INSTRUCTION_ID);
        beamFnDataGrpcMultiplexer.getInboundObserver().onNext(BeamFnApi.Elements.newBuilder().addData(instructionId.setTransformId("A").m748build()).m701build());
        beamFnDataGrpcMultiplexer.getInboundObserver().onNext(BeamFnApi.Elements.newBuilder().addData(instructionId.setTransformId("B").m748build()).m701build());
        beamFnDataGrpcMultiplexer.getInboundObserver().onNext(BeamFnApi.Elements.newBuilder().addData(instructionId.setTransformId("C").m748build()).m701build());
        MatcherAssert.assertThat(arrayList2, (Matcher<? super ArrayList>) Matchers.contains(BeamFnApi.Elements.newBuilder().addData(instructionId.setTransformId("A").m748build()).m701build()));
    }

    @Test
    public void testClose() throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        BeamFnDataGrpcMultiplexer beamFnDataGrpcMultiplexer = new BeamFnDataGrpcMultiplexer(DESCRIPTOR, OutboundObserverFactory.clientDirect(), streamObserver -> {
            Objects.requireNonNull(arrayList);
            TestStreams.Builder withOnNext = TestStreams.withOnNext((v1) -> {
                r0.add(v1);
            });
            Objects.requireNonNull(arrayList2);
            return withOnNext.withOnError((v1) -> {
                r1.add(v1);
            }).build();
        });
        beamFnDataGrpcMultiplexer.registerConsumer(DATA_INSTRUCTION_ID, new CloseableFnDataReceiver<BeamFnApi.Elements>() { // from class: org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexerTest.7
            @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver
            public void flush() throws Exception {
                Assert.fail("Unexpected call");
            }

            @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver, java.lang.AutoCloseable
            public void close() throws Exception {
                atomicBoolean.set(true);
            }

            @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
            public void accept(BeamFnApi.Elements elements) throws Exception {
                Assert.fail("Unexpected call");
            }
        });
        beamFnDataGrpcMultiplexer.close();
        Assert.assertTrue(atomicBoolean.get());
        MatcherAssert.assertThat(((Throwable) Iterables.getOnlyElement(arrayList2)).getMessage(), StringContains.containsString("Multiplexer hanging up"));
    }
}
