package org.apache.beam.fn.harness.control;

import com.beust.jcommander.Parameters;
import java.util.EnumMap;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.BindableService;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.inprocess.InProcessServerBuilder;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/control/BeamFnControlClientTest.class */
public class BeamFnControlClientTest {
    private static final BeamFnApi.InstructionRequest SUCCESSFUL_REQUEST = BeamFnApi.InstructionRequest.newBuilder().setInstructionId("1L").setProcessBundle(BeamFnApi.ProcessBundleRequest.getDefaultInstance()).build();
    private static final BeamFnApi.InstructionResponse SUCCESSFUL_RESPONSE = BeamFnApi.InstructionResponse.newBuilder().setInstructionId("1L").setProcessBundle(BeamFnApi.ProcessBundleResponse.getDefaultInstance()).build();
    private static final BeamFnApi.InstructionRequest UNKNOWN_HANDLER_REQUEST = BeamFnApi.InstructionRequest.newBuilder().setInstructionId("2L").build();
    private static final BeamFnApi.InstructionResponse UNKNOWN_HANDLER_RESPONSE = BeamFnApi.InstructionResponse.newBuilder().setInstructionId("2L").setError("Unknown InstructionRequest type " + BeamFnApi.InstructionRequest.RequestCase.REQUEST_NOT_SET).build();
    private static final RuntimeException FAILURE = new RuntimeException("TestFailure");
    private static final BeamFnApi.InstructionRequest FAILURE_REQUEST = BeamFnApi.InstructionRequest.newBuilder().setInstructionId("3L").setRegister(BeamFnApi.RegisterRequest.getDefaultInstance()).build();
    private static final BeamFnApi.InstructionResponse FAILURE_RESPONSE = BeamFnApi.InstructionResponse.newBuilder().setInstructionId("3L").setError(Throwables.getStackTraceAsString(FAILURE)).build();

    @Test
    public void testDelegation() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        final LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
        Objects.requireNonNull(linkedBlockingQueue);
        final CallStreamObserver build = TestStreams.withOnNext((v1) -> {
            r0.add(v1);
        }).withOnCompleted(() -> {
            atomicBoolean.set(true);
        }).build();
        Endpoints.ApiServiceDescriptor build2 = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getClass().getName() + Parameters.DEFAULT_OPTION_PREFIXES + UUID.randomUUID().toString()).build();
        Server build3 = InProcessServerBuilder.forName(build2.getUrl()).addService((BindableService) new BeamFnControlGrpc.BeamFnControlImplBase() { // from class: org.apache.beam.fn.harness.control.BeamFnControlClientTest.1
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc.BeamFnControlImplBase
            public StreamObserver<BeamFnApi.InstructionResponse> control(StreamObserver<BeamFnApi.InstructionRequest> streamObserver) {
                Uninterruptibles.putUninterruptibly(linkedBlockingQueue2, streamObserver);
                return build;
            }
        }).build();
        build3.start();
        try {
            EnumMap enumMap = new EnumMap(BeamFnApi.InstructionRequest.RequestCase.class);
            enumMap.put((EnumMap) BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE, (BeamFnApi.InstructionRequest.RequestCase) instructionRequest -> {
                return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleResponse.getDefaultInstance());
            });
            enumMap.put((EnumMap) BeamFnApi.InstructionRequest.RequestCase.REGISTER, (BeamFnApi.InstructionRequest.RequestCase) instructionRequest2 -> {
                throw FAILURE;
            });
            BeamFnControlClient beamFnControlClient = new BeamFnControlClient("", build2, InProcessManagedChannelFactory.create(), OutboundObserverFactory.trivial(), enumMap);
            StreamObserver streamObserver = (StreamObserver) linkedBlockingQueue2.take();
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
            Future submit = newCachedThreadPool.submit(() -> {
                beamFnControlClient.processInstructionRequests(newCachedThreadPool);
                return null;
            });
            streamObserver.onNext(SUCCESSFUL_REQUEST);
            Assert.assertEquals(SUCCESSFUL_RESPONSE, linkedBlockingQueue.take());
            streamObserver.onNext(UNKNOWN_HANDLER_REQUEST);
            Assert.assertEquals(UNKNOWN_HANDLER_RESPONSE, linkedBlockingQueue.take());
            streamObserver.onNext(FAILURE_REQUEST);
            Assert.assertEquals(FAILURE_RESPONSE, linkedBlockingQueue.take());
            streamObserver.onCompleted();
            submit.get();
            build3.shutdownNow();
        } catch (Throwable th) {
            build3.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testJavaErrorResponse() throws Exception {
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
        TestStreams.Builder withOnNext = TestStreams.withOnNext(instructionResponse -> {
            Assert.fail(String.format("Unexpected Response %s", instructionResponse));
        });
        Objects.requireNonNull(linkedBlockingQueue2);
        final CallStreamObserver build = withOnNext.withOnError((v1) -> {
            r1.add(v1);
        }).build();
        Endpoints.ApiServiceDescriptor build2 = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getClass().getName() + Parameters.DEFAULT_OPTION_PREFIXES + UUID.randomUUID().toString()).build();
        Server build3 = InProcessServerBuilder.forName(build2.getUrl()).addService((BindableService) new BeamFnControlGrpc.BeamFnControlImplBase() { // from class: org.apache.beam.fn.harness.control.BeamFnControlClientTest.2
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc.BeamFnControlImplBase
            public StreamObserver<BeamFnApi.InstructionResponse> control(StreamObserver<BeamFnApi.InstructionRequest> streamObserver) {
                Uninterruptibles.putUninterruptibly(linkedBlockingQueue, streamObserver);
                return build;
            }
        }).build();
        build3.start();
        try {
            EnumMap enumMap = new EnumMap(BeamFnApi.InstructionRequest.RequestCase.class);
            enumMap.put((EnumMap) BeamFnApi.InstructionRequest.RequestCase.REGISTER, (BeamFnApi.InstructionRequest.RequestCase) instructionRequest -> {
                throw new Error("Test Error");
            });
            BeamFnControlClient beamFnControlClient = new BeamFnControlClient("", build2, InProcessManagedChannelFactory.create(), OutboundObserverFactory.trivial(), enumMap);
            StreamObserver streamObserver = (StreamObserver) linkedBlockingQueue.take();
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
            Future submit = newCachedThreadPool.submit(() -> {
                beamFnControlClient.processInstructionRequests(newCachedThreadPool);
                return null;
            });
            streamObserver.onNext(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("0").setRegister(BeamFnApi.RegisterRequest.getDefaultInstance()).build());
            Assert.assertThat((Throwable) linkedBlockingQueue2.take(), Matchers.not((Matcher) Matchers.nullValue()));
            try {
                submit.get();
                throw new IllegalStateException("The future should have terminated with an error");
            } catch (ExecutionException e) {
                Assert.assertThat(e.getCause().getMessage(), Matchers.containsString("Test Error"));
                build3.shutdownNow();
            }
        } catch (Throwable th) {
            build3.shutdownNow();
            throw th;
        }
    }
}
