/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution;

import java.net.Inet4Address;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Channel;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.epoll.Epoll;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;

public class ServerFactoryTest {
    private static final BeamFnApi.Elements CLIENT_DATA = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference("1")).build();
    private static final BeamFnApi.Elements SERVER_DATA = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference("1")).build();

    @Test
    public void defaultServerWorks() throws Exception {
        Endpoints.ApiServiceDescriptor apiServiceDescriptor = this.runTestUsing(ServerFactory.createDefault(), ManagedChannelFactory.createDefault());
        HostAndPort hostAndPort = HostAndPort.fromString((String)apiServiceDescriptor.getUrl());
        Assert.assertThat((Object)hostAndPort.getHost(), (Matcher)Matchers.anyOf((Matcher)Matchers.equalTo((Object)InetAddress.getLoopbackAddress().getHostName()), (Matcher)Matchers.equalTo((Object)InetAddress.getLoopbackAddress().getHostAddress())));
        Assert.assertThat((Object)hostAndPort.getPort(), (Matcher)Matchers.allOf((Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(0)), (Matcher)Matchers.lessThan((Comparable)Integer.valueOf(65536))));
    }

    @Test
    public void usesUrlFactory() throws Exception {
        ServerFactory serverFactory = ServerFactory.createWithUrlFactory((host, port) -> "foo");
        CallStreamObserver observer = TestStreams.withOnNext(unused -> {}).withOnCompleted(() -> {}).build();
        TestDataService service = new TestDataService((StreamObserver)observer);
        Endpoints.ApiServiceDescriptor.Builder descriptorBuilder = Endpoints.ApiServiceDescriptor.newBuilder();
        Server server = serverFactory.allocateAddressAndCreate((List)ImmutableList.of((Object)((Object)service)), descriptorBuilder);
        server.shutdown();
        Assert.assertThat((Object)descriptorBuilder.getUrl(), (Matcher)Matchers.is((Object)"foo"));
    }

    @Test
    public void defaultServerWithPortSupplier() throws Exception {
        Endpoints.ApiServiceDescriptor apiServiceDescriptor = this.runTestUsing(ServerFactory.createWithPortSupplier(() -> 65535), ManagedChannelFactory.createDefault());
        HostAndPort hostAndPort = HostAndPort.fromString((String)apiServiceDescriptor.getUrl());
        Assert.assertThat((Object)hostAndPort.getHost(), (Matcher)Matchers.anyOf((Matcher)Matchers.equalTo((Object)InetAddress.getLoopbackAddress().getHostName()), (Matcher)Matchers.equalTo((Object)InetAddress.getLoopbackAddress().getHostAddress())));
        Assert.assertThat((Object)hostAndPort.getPort(), (Matcher)Matchers.is((Object)65535));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void urlFactoryWithPortSupplier() throws Exception {
        ServerFactory serverFactory = ServerFactory.createWithUrlFactoryAndPortSupplier((host, port) -> "foo:" + port, () -> 65535);
        CallStreamObserver observer = TestStreams.withOnNext(unused -> {}).withOnCompleted(() -> {}).build();
        TestDataService service = new TestDataService((StreamObserver)observer);
        Endpoints.ApiServiceDescriptor.Builder descriptorBuilder = Endpoints.ApiServiceDescriptor.newBuilder();
        Server server = null;
        try {
            server = serverFactory.allocateAddressAndCreate((List)ImmutableList.of((Object)((Object)service)), descriptorBuilder);
            Assert.assertThat((Object)descriptorBuilder.getUrl(), (Matcher)Matchers.is((Object)"foo:65535"));
        }
        finally {
            if (server != null) {
                server.shutdown();
            }
        }
    }

    @Test
    public void testCreatingEpollServer() throws Exception {
        Assume.assumeTrue((boolean)Epoll.isAvailable());
        Assume.assumeTrue((boolean)(InetAddress.getLoopbackAddress() instanceof Inet4Address));
        Endpoints.ApiServiceDescriptor apiServiceDescriptor = this.runTestUsing(ServerFactory.createEpollSocket(), ManagedChannelFactory.createEpoll());
        HostAndPort hostAndPort = HostAndPort.fromString((String)apiServiceDescriptor.getUrl());
        Assert.assertThat((Object)hostAndPort.getHost(), (Matcher)Matchers.anyOf((Matcher)Matchers.equalTo((Object)InetAddress.getLoopbackAddress().getHostName()), (Matcher)Matchers.equalTo((Object)InetAddress.getLoopbackAddress().getHostAddress())));
        Assert.assertThat((Object)hostAndPort.getPort(), (Matcher)Matchers.allOf((Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(0)), (Matcher)Matchers.lessThan((Comparable)Integer.valueOf(65536))));
    }

    @Test
    public void testCreatingUnixDomainSocketServer() throws Exception {
        Assume.assumeTrue((boolean)Epoll.isAvailable());
        Endpoints.ApiServiceDescriptor apiServiceDescriptor = this.runTestUsing(ServerFactory.createEpollDomainSocket(), ManagedChannelFactory.createEpoll());
        Assert.assertThat((Object)apiServiceDescriptor.getUrl(), (Matcher)Matchers.startsWith((String)("unix://" + System.getProperty("java.io.tmpdir"))));
    }

    private Endpoints.ApiServiceDescriptor runTestUsing(ServerFactory serverFactory, ManagedChannelFactory channelFactory) throws Exception {
        Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder = Endpoints.ApiServiceDescriptor.newBuilder();
        ArrayList serverElements = new ArrayList();
        CountDownLatch clientHangedUp = new CountDownLatch(1);
        CallStreamObserver serverInboundObserver = TestStreams.withOnNext(serverElements::add).withOnCompleted(clientHangedUp::countDown).build();
        TestDataService service = new TestDataService((StreamObserver)serverInboundObserver);
        Server server = serverFactory.allocateAddressAndCreate((List)ImmutableList.of((Object)((Object)service)), apiServiceDescriptorBuilder);
        Assert.assertFalse((boolean)server.isShutdown());
        ManagedChannel channel = channelFactory.forDescriptor(apiServiceDescriptorBuilder.build());
        BeamFnDataGrpc.BeamFnDataStub stub = BeamFnDataGrpc.newStub((Channel)channel);
        ArrayList clientElements = new ArrayList();
        CountDownLatch serverHangedUp = new CountDownLatch(1);
        CallStreamObserver clientInboundObserver = TestStreams.withOnNext(clientElements::add).withOnCompleted(serverHangedUp::countDown).build();
        StreamObserver clientOutboundObserver = stub.data((StreamObserver)clientInboundObserver);
        StreamObserver serverOutboundObserver = (StreamObserver)service.outboundObservers.take();
        clientOutboundObserver.onNext((Object)CLIENT_DATA);
        serverOutboundObserver.onNext((Object)SERVER_DATA);
        clientOutboundObserver.onCompleted();
        clientHangedUp.await();
        serverOutboundObserver.onCompleted();
        serverHangedUp.await();
        Assert.assertThat(clientElements, (Matcher)Matchers.contains((Object[])new BeamFnApi.Elements[]{SERVER_DATA}));
        Assert.assertThat(serverElements, (Matcher)Matchers.contains((Object[])new BeamFnApi.Elements[]{CLIENT_DATA}));
        return apiServiceDescriptorBuilder.build();
    }

    private static class TestDataService
    extends BeamFnDataGrpc.BeamFnDataImplBase {
        private final LinkedBlockingQueue<StreamObserver<BeamFnApi.Elements>> outboundObservers;
        private final StreamObserver<BeamFnApi.Elements> inboundObserver;

        private TestDataService(StreamObserver<BeamFnApi.Elements> inboundObserver) {
            this.inboundObserver = inboundObserver;
            this.outboundObservers = new LinkedBlockingQueue();
        }

        public StreamObserver<BeamFnApi.Elements> data(StreamObserver<BeamFnApi.Elements> outboundObserver) {
            Uninterruptibles.putUninterruptibly(this.outboundObservers, outboundObserver);
            return this.inboundObserver;
        }
    }
}

