package org.apache.beam.runners.fnexecution;

import java.net.Inet4Address;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.server.ServerFactory;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.testing.GrpcCleanupRule;
import org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.Epoll;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/fnexecution/ServerFactoryTest.class */
public class ServerFactoryTest {
    private static final BeamFnApi.Elements CLIENT_DATA = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId("1")).build();
    private static final BeamFnApi.Elements SERVER_DATA = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId("1")).build();

    @Rule
    public GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule().setTimeout(10, TimeUnit.SECONDS);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/ServerFactoryTest$TestDataService.class */
    public static class TestDataService extends BeamFnDataGrpc.BeamFnDataImplBase {
        private final LinkedBlockingQueue<StreamObserver<BeamFnApi.Elements>> outboundObservers;
        private final StreamObserver<BeamFnApi.Elements> inboundObserver;

        private TestDataService(StreamObserver<BeamFnApi.Elements> streamObserver) {
            this.inboundObserver = streamObserver;
            this.outboundObservers = new LinkedBlockingQueue<>();
        }

        public StreamObserver<BeamFnApi.Elements> data(StreamObserver<BeamFnApi.Elements> streamObserver) {
            Uninterruptibles.putUninterruptibly(this.outboundObservers, streamObserver);
            return this.inboundObserver;
        }
    }

    @Test
    public void defaultServerWorks() throws Exception {
        HostAndPort fromString = HostAndPort.fromString(runTestUsing(ServerFactory.createDefault(), ManagedChannelFactory.createDefault()).getUrl());
        MatcherAssert.assertThat(fromString.getHost(), Matchers.anyOf(Matchers.equalTo(InetAddress.getLoopbackAddress().getHostName()), Matchers.equalTo(InetAddress.getLoopbackAddress().getHostAddress())));
        MatcherAssert.assertThat(Integer.valueOf(fromString.getPort()), Matchers.allOf(Matchers.greaterThan(0), Matchers.lessThan(65536)));
    }

    @Test
    public void usesUrlFactory() throws Exception {
        ServerFactory createWithUrlFactory = ServerFactory.createWithUrlFactory((str, i) -> {
            return "foo";
        });
        TestDataService testDataService = new TestDataService(TestStreams.withOnNext(elements -> {
        }).withOnCompleted(() -> {
        }).build());
        Endpoints.ApiServiceDescriptor.Builder newBuilder = Endpoints.ApiServiceDescriptor.newBuilder();
        this.grpcCleanupRule.register(createWithUrlFactory.allocateAddressAndCreate(ImmutableList.of(testDataService), newBuilder));
        MatcherAssert.assertThat(newBuilder.getUrl(), Matchers.is("foo"));
    }

    @Test
    public void defaultServerWithPortSupplier() throws Exception {
        HostAndPort fromString = HostAndPort.fromString(runTestUsing(ServerFactory.createWithPortSupplier(() -> {
            return 65535;
        }), ManagedChannelFactory.createDefault()).getUrl());
        MatcherAssert.assertThat(fromString.getHost(), Matchers.anyOf(Matchers.equalTo(InetAddress.getLoopbackAddress().getHostName()), Matchers.equalTo(InetAddress.getLoopbackAddress().getHostAddress())));
        MatcherAssert.assertThat(Integer.valueOf(fromString.getPort()), Matchers.is(65535));
    }

    @Test
    public void urlFactoryWithPortSupplier() throws Exception {
        ServerFactory createWithUrlFactoryAndPortSupplier = ServerFactory.createWithUrlFactoryAndPortSupplier((str, i) -> {
            return "foo:" + i;
        }, () -> {
            return 65535;
        });
        TestDataService testDataService = new TestDataService(TestStreams.withOnNext(elements -> {
        }).withOnCompleted(() -> {
        }).build());
        Endpoints.ApiServiceDescriptor.Builder newBuilder = Endpoints.ApiServiceDescriptor.newBuilder();
        this.grpcCleanupRule.register(createWithUrlFactoryAndPortSupplier.allocateAddressAndCreate(ImmutableList.of(testDataService), newBuilder));
        MatcherAssert.assertThat(newBuilder.getUrl(), Matchers.is("foo:65535"));
    }

    @Test
    public void testCreatingEpollServer() throws Exception {
        Assume.assumeTrue(Epoll.isAvailable());
        Assume.assumeTrue(InetAddress.getLoopbackAddress() instanceof Inet4Address);
        HostAndPort fromString = HostAndPort.fromString(runTestUsing(ServerFactory.createEpollSocket(), ManagedChannelFactory.createEpoll()).getUrl());
        MatcherAssert.assertThat(fromString.getHost(), Matchers.anyOf(Matchers.equalTo(InetAddress.getLoopbackAddress().getHostName()), Matchers.equalTo(InetAddress.getLoopbackAddress().getHostAddress())));
        MatcherAssert.assertThat(Integer.valueOf(fromString.getPort()), Matchers.allOf(Matchers.greaterThan(0), Matchers.lessThan(65536)));
    }

    @Test
    public void testCreatingUnixDomainSocketServer() throws Exception {
        Assume.assumeTrue(Epoll.isAvailable());
        MatcherAssert.assertThat(runTestUsing(ServerFactory.createEpollDomainSocket(), ManagedChannelFactory.createEpoll()).getUrl(), Matchers.startsWith("unix://" + System.getProperty("java.io.tmpdir")));
    }

    private Endpoints.ApiServiceDescriptor runTestUsing(ServerFactory serverFactory, ManagedChannelFactory managedChannelFactory) throws Exception {
        Endpoints.ApiServiceDescriptor.Builder newBuilder = Endpoints.ApiServiceDescriptor.newBuilder();
        ArrayList arrayList = new ArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Objects.requireNonNull(arrayList);
        TestStreams.Builder withOnNext = TestStreams.withOnNext((v1) -> {
            r0.add(v1);
        });
        Objects.requireNonNull(countDownLatch);
        TestDataService testDataService = new TestDataService(withOnNext.withOnCompleted(countDownLatch::countDown).build());
        Assert.assertFalse(serverFactory.allocateAddressAndCreate(ImmutableList.of(testDataService), newBuilder).isShutdown());
        BeamFnDataGrpc.BeamFnDataStub newStub = BeamFnDataGrpc.newStub(managedChannelFactory.forDescriptor(newBuilder.build()));
        ArrayList arrayList2 = new ArrayList();
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Objects.requireNonNull(arrayList2);
        TestStreams.Builder withOnNext2 = TestStreams.withOnNext((v1) -> {
            r0.add(v1);
        });
        Objects.requireNonNull(countDownLatch2);
        StreamObserver data = newStub.data(withOnNext2.withOnCompleted(countDownLatch2::countDown).build());
        StreamObserver streamObserver = (StreamObserver) testDataService.outboundObservers.take();
        data.onNext(CLIENT_DATA);
        streamObserver.onNext(SERVER_DATA);
        data.onCompleted();
        countDownLatch.await();
        streamObserver.onCompleted();
        countDownLatch2.await();
        MatcherAssert.assertThat(arrayList2, Matchers.contains(new BeamFnApi.Elements[]{SERVER_DATA}));
        MatcherAssert.assertThat(arrayList, Matchers.contains(new BeamFnApi.Elements[]{CLIENT_DATA}));
        return newBuilder.build();
    }
}
