/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution;

import io.grpc.BindableService;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.java.fn.execution.repackaged.com.google.common.net.HostAndPort;
import org.apache.beam.runners.java.fn.execution.repackaged.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class ServerFactoryTest {
    private static final BeamFnApi.Elements CLIENT_DATA = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference("1")).build();
    private static final BeamFnApi.Elements SERVER_DATA = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference("1")).build();

    @Test
    public void testCreatingDefaultServer() throws Exception {
        Endpoints.ApiServiceDescriptor apiServiceDescriptor = this.runTestUsing(ServerFactory.createDefault(), ManagedChannelFactory.createDefault());
        HostAndPort hostAndPort = HostAndPort.fromString((String)apiServiceDescriptor.getUrl());
        Assert.assertThat((Object)hostAndPort.getHost(), (Matcher)Matchers.anyOf((Matcher)Matchers.equalTo((Object)InetAddress.getLoopbackAddress().getHostName()), (Matcher)Matchers.equalTo((Object)InetAddress.getLoopbackAddress().getHostAddress())));
        Assert.assertThat((Object)hostAndPort.getPort(), (Matcher)Matchers.allOf((Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(0)), (Matcher)Matchers.lessThan((Comparable)Integer.valueOf(65536))));
    }

    private Endpoints.ApiServiceDescriptor runTestUsing(ServerFactory serverFactory, ManagedChannelFactory channelFactory) throws Exception {
        Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder = Endpoints.ApiServiceDescriptor.newBuilder();
        ArrayList serverElements = new ArrayList();
        CountDownLatch clientHangedUp = new CountDownLatch(1);
        CallStreamObserver serverInboundObserver = TestStreams.withOnNext(serverElements::add).withOnCompleted(clientHangedUp::countDown).build();
        TestDataService service = new TestDataService((StreamObserver)serverInboundObserver);
        Server server = serverFactory.allocatePortAndCreate((BindableService)service, apiServiceDescriptorBuilder);
        Assert.assertFalse((boolean)server.isShutdown());
        ManagedChannel channel = channelFactory.forDescriptor(apiServiceDescriptorBuilder.build());
        BeamFnDataGrpc.BeamFnDataStub stub = BeamFnDataGrpc.newStub((Channel)channel);
        ArrayList clientElements = new ArrayList();
        CountDownLatch serverHangedUp = new CountDownLatch(1);
        CallStreamObserver clientInboundObserver = TestStreams.withOnNext(clientElements::add).withOnCompleted(serverHangedUp::countDown).build();
        StreamObserver clientOutboundObserver = stub.data((StreamObserver)clientInboundObserver);
        StreamObserver serverOutboundObserver = (StreamObserver)service.outboundObservers.take();
        clientOutboundObserver.onNext((Object)CLIENT_DATA);
        serverOutboundObserver.onNext((Object)SERVER_DATA);
        clientOutboundObserver.onCompleted();
        clientHangedUp.await();
        serverOutboundObserver.onCompleted();
        serverHangedUp.await();
        Assert.assertThat(clientElements, (Matcher)Matchers.contains((Object[])new BeamFnApi.Elements[]{SERVER_DATA}));
        Assert.assertThat(serverElements, (Matcher)Matchers.contains((Object[])new BeamFnApi.Elements[]{CLIENT_DATA}));
        return apiServiceDescriptorBuilder.build();
    }

    private static class TestDataService
    extends BeamFnDataGrpc.BeamFnDataImplBase {
        private final LinkedBlockingQueue<StreamObserver<BeamFnApi.Elements>> outboundObservers;
        private final StreamObserver<BeamFnApi.Elements> inboundObserver;

        private TestDataService(StreamObserver<BeamFnApi.Elements> inboundObserver) {
            this.inboundObserver = inboundObserver;
            this.outboundObservers = new LinkedBlockingQueue();
        }

        public StreamObserver<BeamFnApi.Elements> data(StreamObserver<BeamFnApi.Elements> outboundObserver) {
            Uninterruptibles.putUninterruptibly(this.outboundObservers, outboundObserver);
            return this.inboundObserver;
        }
    }
}

