/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.control;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.HeaderAccessor;
import org.apache.beam.runners.fnexecution.InProcessServerFactory;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.control.MapControlClientPool;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Channel;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(value=JUnit4.class)
public class FnApiControlClientPoolServiceTest {
    private final ControlClientPool pool = MapControlClientPool.create();
    private final FnApiControlClientPoolService controlService = FnApiControlClientPoolService.offeringClientsToPool((ControlClientPool.Sink)this.pool.getSink(), (HeaderAccessor)GrpcContextHeaderAccessorProvider.getHeaderAccessor());
    private GrpcFnServer<FnApiControlClientPoolService> server;
    private BeamFnControlGrpc.BeamFnControlStub stub;

    @Before
    public void setup() throws IOException {
        this.server = GrpcFnServer.allocatePortAndCreateFor((FnService)this.controlService, (ServerFactory)InProcessServerFactory.create());
        this.stub = BeamFnControlGrpc.newStub((Channel)InProcessChannelBuilder.forName((String)this.server.getApiServiceDescriptor().getUrl()).build());
    }

    @After
    public void teardown() throws Exception {
        this.server.close();
    }

    @Test
    public void testIncomingConnection() throws Exception {
        StreamObserver requestObserver = (StreamObserver)Mockito.mock(StreamObserver.class);
        StreamObserver responseObserver = this.controlService.control(requestObserver);
        InstructionRequestHandler client = this.pool.getSource().take("", Duration.ofSeconds(2L));
        String id = "fakeInstruction";
        CompletionStage responseFuture = client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
        ((StreamObserver)Mockito.verify((Object)requestObserver)).onNext((Object)((BeamFnApi.InstructionRequest)org.mockito.Matchers.any(BeamFnApi.InstructionRequest.class)));
        Assert.assertThat((Object)MoreFutures.isDone((CompletionStage)responseFuture), (Matcher)Matchers.is((Object)false));
        responseObserver.onNext((Object)BeamFnApi.InstructionResponse.newBuilder().setInstructionId(id).build());
        MoreFutures.get((CompletionStage)responseFuture);
    }

    @Test
    public void testCloseCompletesClients() throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicBoolean sawComplete = new AtomicBoolean();
        this.stub.control((StreamObserver)new StreamObserver<BeamFnApi.InstructionRequest>(){

            public void onNext(BeamFnApi.InstructionRequest value) {
                Assert.fail((String)"Should never see a request");
            }

            public void onError(Throwable t) {
                latch.countDown();
            }

            public void onCompleted() {
                sawComplete.set(true);
                latch.countDown();
            }
        });
        this.pool.getSource().take("", Duration.ofSeconds(2L));
        this.server.close();
        latch.await();
        Assert.assertThat((Object)sawComplete.get(), (Matcher)Matchers.is((Object)true));
    }
}

