package org.apache.beam.runners.fnexecution.control;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.InProcessServerFactory;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.vendor.grpc.v1.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1.io.grpc.stub.StreamObserver;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Matchers;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.class */
public class FnApiControlClientPoolServiceTest {
    private final ControlClientPool pool = MapControlClientPool.create();
    private final FnApiControlClientPoolService controlService = FnApiControlClientPoolService.offeringClientsToPool(this.pool.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor());
    private GrpcFnServer<FnApiControlClientPoolService> server;
    private BeamFnControlGrpc.BeamFnControlStub stub;

    @Before
    public void setup() throws IOException {
        this.server = GrpcFnServer.allocatePortAndCreateFor(this.controlService, InProcessServerFactory.create());
        this.stub = BeamFnControlGrpc.newStub(InProcessChannelBuilder.forName(this.server.getApiServiceDescriptor().getUrl()).build());
    }

    @After
    public void teardown() throws Exception {
        this.server.close();
    }

    @Test
    public void testIncomingConnection() throws Exception {
        StreamObserver streamObserver = (StreamObserver) Mockito.mock(StreamObserver.class);
        StreamObserver control = this.controlService.control(streamObserver);
        CompletionStage handle = this.pool.getSource().take("", Duration.ofSeconds(2L)).handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("fakeInstruction").build());
        ((StreamObserver) Mockito.verify(streamObserver)).onNext((BeamFnApi.InstructionRequest) Matchers.any(BeamFnApi.InstructionRequest.class));
        Assert.assertThat(Boolean.valueOf(MoreFutures.isDone(handle)), org.hamcrest.Matchers.is(false));
        control.onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId("fakeInstruction").build());
        MoreFutures.get(handle);
    }

    @Test
    public void testCloseCompletesClients() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        this.stub.control(new StreamObserver<BeamFnApi.InstructionRequest>() { // from class: org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolServiceTest.1
            public void onNext(BeamFnApi.InstructionRequest instructionRequest) {
                Assert.fail("Should never see a request");
            }

            public void onError(Throwable th) {
                countDownLatch.countDown();
            }

            public void onCompleted() {
                atomicBoolean.set(true);
                countDownLatch.countDown();
            }
        });
        this.pool.getSource().take("", Duration.ofSeconds(2L));
        this.server.close();
        countDownLatch.await();
        Assert.assertThat(Boolean.valueOf(atomicBoolean.get()), org.hamcrest.Matchers.is(true));
    }
}
