package org.apache.beam.fn.harness.state;

import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.StatusRuntimeException;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.logging.log4j.util.ProcessIdUtil;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.class */
public class BeamFnStateGrpcClientCacheTest {
    private static final String SUCCESS = "SUCCESS";
    private static final String FAIL = "FAIL";
    private static final String TEST_ERROR = "TEST ERROR";
    private static final String SERVER_ERROR = "SERVER ERROR";

    @Rule
    public TestExecutors.TestExecutorService executor = TestExecutors.from((Supplier<ExecutorService>) Executors::newCachedThreadPool);
    private Endpoints.ApiServiceDescriptor apiServiceDescriptor;
    private Server testServer;
    private BeamFnStateGrpcClientCache clientCache;
    private BlockingQueue<StreamObserver<BeamFnApi.StateResponse>> outboundServerObservers;
    private BlockingQueue<BeamFnApi.StateRequest> values;

    @Before
    public void setUp() throws Exception {
        this.values = new LinkedBlockingQueue();
        this.outboundServerObservers = new LinkedBlockingQueue();
        BlockingQueue<BeamFnApi.StateRequest> blockingQueue = this.values;
        Objects.requireNonNull(blockingQueue);
        final CallStreamObserver build = TestStreams.withOnNext((v1) -> {
            r0.add(v1);
        }).build();
        this.apiServiceDescriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getClass().getName() + ProcessIdUtil.DEFAULT_PROCESSID + UUID.randomUUID()).build();
        this.testServer = InProcessServerBuilder.forName(this.apiServiceDescriptor.getUrl()).addService(new BeamFnStateGrpc.BeamFnStateImplBase() { // from class: org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCacheTest.1
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc.AsyncService
            public StreamObserver<BeamFnApi.StateRequest> state(StreamObserver<BeamFnApi.StateResponse> streamObserver) {
                Uninterruptibles.putUninterruptibly(BeamFnStateGrpcClientCacheTest.this.outboundServerObservers, streamObserver);
                return build;
            }
        }).build();
        this.testServer.start();
        this.clientCache = new BeamFnStateGrpcClientCache(IdGenerators.decrementingLongs(), ManagedChannelFactory.createInProcess(), OutboundObserverFactory.trivial());
    }

    @After
    public void tearDown() throws Exception {
        this.testServer.shutdownNow();
    }

    @Test
    public void testCachingOfClient() throws Exception {
        Endpoints.ApiServiceDescriptor build = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.apiServiceDescriptor.getUrl() + "-other").build();
        Server build2 = InProcessServerBuilder.forName(build.getUrl()).addService(new BeamFnStateGrpc.BeamFnStateImplBase() { // from class: org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCacheTest.2
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc.AsyncService
            public StreamObserver<BeamFnApi.StateRequest> state(StreamObserver<BeamFnApi.StateResponse> streamObserver) {
                throw new RuntimeException();
            }
        }).build();
        build2.start();
        try {
            Assert.assertSame(this.clientCache.forApiServiceDescriptor(this.apiServiceDescriptor), this.clientCache.forApiServiceDescriptor(this.apiServiceDescriptor));
            Assert.assertNotSame(this.clientCache.forApiServiceDescriptor(this.apiServiceDescriptor), this.clientCache.forApiServiceDescriptor(build));
        } finally {
            build2.shutdownNow();
        }
    }

    @Test
    public void testRequestResponses() throws Exception {
        BeamFnStateClient forApiServiceDescriptor = this.clientCache.forApiServiceDescriptor(this.apiServiceDescriptor);
        CompletableFuture handle = forApiServiceDescriptor.handle(BeamFnApi.StateRequest.newBuilder().setInstructionId(SUCCESS));
        CompletableFuture handle2 = forApiServiceDescriptor.handle(BeamFnApi.StateRequest.newBuilder().setInstructionId(FAIL));
        StreamObserver<BeamFnApi.StateResponse> take = this.outboundServerObservers.take();
        take.onNext(BeamFnApi.StateResponse.newBuilder().setId("UNKNOWN ID").build());
        handleServerRequest(take, this.values.take());
        handleServerRequest(take, this.values.take());
        Assert.assertNotNull(handle.get());
        try {
            handle2.get();
            Assert.fail("Expected unsuccessful response");
        } catch (ExecutionException e) {
            MatcherAssert.assertThat(e.toString(), Matchers.containsString(TEST_ERROR));
        }
    }

    @Test
    public void testServerErrorCausesPendingAndFutureCallsToFail() throws Exception {
        BeamFnStateClient forApiServiceDescriptor = this.clientCache.forApiServiceDescriptor(this.apiServiceDescriptor);
        Future submit = this.executor.submit(() -> {
            return forApiServiceDescriptor.handle(BeamFnApi.StateRequest.newBuilder().setInstructionId(SUCCESS));
        });
        Future submit2 = this.executor.submit(() -> {
            this.outboundServerObservers.take().onError(new StatusRuntimeException(Status.INTERNAL.withDescription(SERVER_ERROR)));
            return null;
        });
        CompletableFuture completableFuture = (CompletableFuture) submit.get();
        submit2.get();
        try {
            completableFuture.get();
            Assert.fail("Expected unsuccessful response due to server error");
        } catch (ExecutionException e) {
            MatcherAssert.assertThat(e.toString(), Matchers.containsString(SERVER_ERROR));
        }
    }

    @Test
    public void testServerCompletionCausesPendingAndFutureCallsToFail() throws Exception {
        BeamFnStateClient forApiServiceDescriptor = this.clientCache.forApiServiceDescriptor(this.apiServiceDescriptor);
        Future submit = this.executor.submit(() -> {
            return forApiServiceDescriptor.handle(BeamFnApi.StateRequest.newBuilder().setInstructionId(SUCCESS));
        });
        Future submit2 = this.executor.submit(() -> {
            this.outboundServerObservers.take().onCompleted();
            return null;
        });
        CompletableFuture completableFuture = (CompletableFuture) submit.get();
        submit2.get();
        try {
            completableFuture.get();
            Assert.fail("Expected unsuccessful response due to server error");
        } catch (ExecutionException e) {
            MatcherAssert.assertThat(e.toString(), Matchers.containsString("Server hanged up"));
        }
    }

    private void handleServerRequest(StreamObserver<BeamFnApi.StateResponse> streamObserver, BeamFnApi.StateRequest stateRequest) {
        String instructionId = stateRequest.getInstructionId();
        boolean z = -1;
        switch (instructionId.hashCode()) {
            case -1149187101:
                if (instructionId.equals(SUCCESS)) {
                    z = false;
                    break;
                }
                break;
            case 2150174:
                if (instructionId.equals(FAIL)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                streamObserver.onNext(BeamFnApi.StateResponse.newBuilder().setId(stateRequest.getId()).build());
                return;
            case true:
                streamObserver.onNext(BeamFnApi.StateResponse.newBuilder().setId(stateRequest.getId()).setError(TEST_ERROR).build());
                return;
            default:
                streamObserver.onNext(BeamFnApi.StateResponse.newBuilder().setId(stateRequest.getId()).build());
                return;
        }
    }
}
