package org.apache.beam.runners.fnexecution.control;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.sdk.fn.channel.AddHarnessIdInterceptor;
import org.apache.beam.sdk.fn.server.GrpcContextHeaderAccessorProvider;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.fn.server.InProcessServerFactory;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ClientInterceptor;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.StatusException;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.class */
public class FnApiControlClientPoolServiceTest {
    private static final String WORKER_ID = "test_worker_id";
    private GrpcFnServer<FnApiControlClientPoolService> server;
    private BeamFnControlGrpc.BeamFnControlStub stub;

    @Rule
    public transient Timeout globalTimeout = Timeout.seconds(600);
    private final ControlClientPool pool = MapControlClientPool.create();
    private final FnApiControlClientPoolService controlService = FnApiControlClientPoolService.offeringClientsToPool(this.pool.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor());

    @Before
    public void setup() throws IOException {
        this.server = GrpcFnServer.allocatePortAndCreateFor(this.controlService, InProcessServerFactory.create());
        this.stub = BeamFnControlGrpc.newStub(InProcessChannelBuilder.forName(this.server.getApiServiceDescriptor().getUrl()).intercept(new ClientInterceptor[]{AddHarnessIdInterceptor.create(WORKER_ID)}).build());
    }

    @After
    public void teardown() throws Exception {
        this.server.close();
    }

    @Test
    public void testIncomingConnection() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(this.stub.control(TestStreams.withOnNext(instructionRequest -> {
            try {
                ((StreamObserver) completableFuture.get()).onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId(instructionRequest.getInstructionId()).build());
            } catch (Exception e) {
                Assert.fail("Unexpected failure");
                throw new RuntimeException(e);
            }
        }).build()));
        Assert.assertEquals("fakeInstruction", ((BeamFnApi.InstructionResponse) MoreFutures.get(this.pool.getSource().take(WORKER_ID, Duration.ofSeconds(2L)).handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("fakeInstruction").build()))).getInstructionId());
    }

    @Test
    public void testCloseCompletesClients() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        this.stub.control(new StreamObserver<BeamFnApi.InstructionRequest>() { // from class: org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolServiceTest.1
            public void onNext(BeamFnApi.InstructionRequest instructionRequest) {
                Assert.fail("Should never see a request");
            }

            public void onError(Throwable th) {
                countDownLatch.countDown();
            }

            public void onCompleted() {
                atomicBoolean.set(true);
                countDownLatch.countDown();
            }
        });
        this.pool.getSource().take(WORKER_ID, Duration.ofSeconds(2L));
        this.server.close();
        countDownLatch.await();
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean.get()), Matchers.is(true));
    }

    @Test
    public void testUnknownBundle() throws Exception {
        BeamFnApi.GetProcessBundleDescriptorRequest build = BeamFnApi.GetProcessBundleDescriptorRequest.newBuilder().setProcessBundleDescriptorId("missing").build();
        StreamObserver streamObserver = (StreamObserver) Mockito.mock(StreamObserver.class);
        this.controlService.getProcessBundleDescriptor(build, streamObserver);
        ((StreamObserver) Mockito.verify(streamObserver)).onError((Throwable) org.mockito.Matchers.argThat(th -> {
            return (th instanceof StatusException) && ((StatusException) th).getStatus().getCode() == Status.Code.NOT_FOUND;
        }));
    }
}
