package org.apache.beam.fn.harness;

import com.google.cloud.hadoop.gcsio.DirectoryListCache;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.repackaged.beam_sdks_java_harness.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1.io.grpc.ServerBuilder;
import org.apache.beam.vendor.grpc.v1.io.grpc.stub.StreamObserver;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/FnHarnessTest.class */
public class FnHarnessTest {
    private static final BeamFnApi.InstructionRequest INSTRUCTION_REQUEST = BeamFnApi.InstructionRequest.newBuilder().setInstructionId("999L").setRegister(BeamFnApi.RegisterRequest.getDefaultInstance()).build();
    private static final BeamFnApi.InstructionResponse INSTRUCTION_RESPONSE = BeamFnApi.InstructionResponse.newBuilder().setInstructionId("999L").setRegister(BeamFnApi.RegisterResponse.getDefaultInstance()).build();

    /* JADX WARN: Type inference failed for: r0v16, types: [org.apache.beam.vendor.grpc.v1.io.grpc.ServerBuilder] */
    /* JADX WARN: Type inference failed for: r0v7, types: [org.apache.beam.vendor.grpc.v1.io.grpc.ServerBuilder] */
    @Test(timeout = DirectoryListCache.Config.MAX_INFO_AGE_MILLIS_DEFAULT)
    public void testLaunchFnHarnessAndTeardownCleanly() throws Exception {
        final PipelineOptions create = PipelineOptionsFactory.create();
        final ArrayList arrayList = new ArrayList();
        final ArrayList arrayList2 = new ArrayList();
        BeamFnLoggingGrpc.BeamFnLoggingImplBase beamFnLoggingImplBase = new BeamFnLoggingGrpc.BeamFnLoggingImplBase() { // from class: org.apache.beam.fn.harness.FnHarnessTest.1
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc.BeamFnLoggingImplBase
            public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> streamObserver) {
                List list = arrayList;
                TestStreams.Builder withOnNext = TestStreams.withOnNext(list2 -> {
                    list.addAll(list2.getLogEntriesList());
                });
                Objects.requireNonNull(streamObserver);
                return withOnNext.withOnCompleted(streamObserver::onCompleted).build();
            }
        };
        BeamFnControlGrpc.BeamFnControlImplBase beamFnControlImplBase = new BeamFnControlGrpc.BeamFnControlImplBase() { // from class: org.apache.beam.fn.harness.FnHarnessTest.2
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc.BeamFnControlImplBase
            public StreamObserver<BeamFnApi.InstructionResponse> control(StreamObserver<BeamFnApi.InstructionRequest> streamObserver) {
                CountDownLatch countDownLatch = new CountDownLatch(1);
                ((GcsOptions) create.as(GcsOptions.class)).getExecutorService().submit(() -> {
                    streamObserver.onNext(FnHarnessTest.INSTRUCTION_REQUEST);
                    Uninterruptibles.awaitUninterruptibly(countDownLatch);
                    streamObserver.onCompleted();
                });
                List list = arrayList2;
                TestStreams.Builder withOnNext = TestStreams.withOnNext(instructionResponse -> {
                    list.add(instructionResponse);
                    countDownLatch.countDown();
                });
                Objects.requireNonNull(countDownLatch);
                return withOnNext.withOnCompleted(countDownLatch::countDown).build();
            }
        };
        Server build = ServerBuilder.forPort(0).addService(beamFnLoggingImplBase).build();
        build.start();
        try {
            build = ServerBuilder.forPort(0).addService(beamFnControlImplBase).build();
            build.start();
            try {
                FnHarness.main("id", create, Endpoints.ApiServiceDescriptor.newBuilder().setUrl("localhost:" + build.getPort()).build(), Endpoints.ApiServiceDescriptor.newBuilder().setUrl("localhost:" + build.getPort()).build());
                Assert.assertThat(arrayList2, Matchers.contains(INSTRUCTION_RESPONSE));
                build.shutdownNow();
                build.shutdownNow();
            } finally {
                build.shutdownNow();
            }
        } catch (Throwable th) {
            throw th;
        }
    }
}
