package org.apache.beam.runners.core.fn;

import io.grpc.stub.StreamObserver;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.beam.fn.v1.BeamFnApi;
import org.apache.beam.runners.core.java.repackaged.com.google.common.util.concurrent.ListenableFuture;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Matchers;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/core/fn/FnApiControlClientPoolServiceTest.class */
public class FnApiControlClientPoolServiceTest {
    private final BlockingQueue<FnApiControlClient> pool = new LinkedBlockingQueue();
    private FnApiControlClientPoolService controlService = FnApiControlClientPoolService.offeringClientsToPool(this.pool);

    @Test
    public void testIncomingConnection() throws Exception {
        StreamObserver streamObserver = (StreamObserver) Mockito.mock(StreamObserver.class);
        StreamObserver control = this.controlService.control(streamObserver);
        ListenableFuture handle = this.pool.take().handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("fakeInstruction").build());
        ((StreamObserver) Mockito.verify(streamObserver)).onNext(Matchers.any(BeamFnApi.InstructionRequest.class));
        Assert.assertThat(Boolean.valueOf(handle.isDone()), org.hamcrest.Matchers.is(false));
        control.onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId("fakeInstruction").build());
        handle.get();
    }
}
