package org.apache.beam.fn.harness;

import com.google.auto.service.AutoService;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.harness.JvmInitializer;
import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ServerBuilder;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.InOrder;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/FnHarnessTest.class */
public class FnHarnessTest {
    private static final BeamFnApi.InstructionRequest INSTRUCTION_REQUEST = BeamFnApi.InstructionRequest.newBuilder().setInstructionId("999L").setRegister(BeamFnApi.RegisterRequest.getDefaultInstance()).build();
    private static final BeamFnApi.InstructionResponse INSTRUCTION_RESPONSE = BeamFnApi.InstructionResponse.newBuilder().setInstructionId("999L").setRegister(BeamFnApi.RegisterResponse.getDefaultInstance()).build();
    private static Runnable onStartupMock = (Runnable) Mockito.mock(Runnable.class);
    private static Consumer<PipelineOptions> beforeProcessingMock = (Consumer) Mockito.mock(Consumer.class);

    @Rule
    public Timeout timeout = Timeout.builder().withLookingForStuckThread(true).withTimeout(1, TimeUnit.MINUTES).build();

    @AutoService({JvmInitializer.class})
    /* loaded from: input_file:org/apache/beam/fn/harness/FnHarnessTest$FnHarnessTestInitializer.class */
    public static class FnHarnessTestInitializer implements JvmInitializer {
        public void onStartup() {
            FnHarnessTest.onStartupMock.run();
        }

        public void beforeProcessing(PipelineOptions pipelineOptions) {
            FnHarnessTest.beforeProcessingMock.accept(pipelineOptions);
        }
    }

    @Test
    public void testLaunchFnHarnessAndTeardownCleanly() throws Exception {
        Function function = (Function) Mockito.mock(Function.class);
        final PipelineOptions create = PipelineOptionsFactory.create();
        Mockito.when((String) function.apply("HARNESS_ID")).thenReturn("id");
        Mockito.when((String) function.apply("PIPELINE_OPTIONS")).thenReturn(PipelineOptionsTranslation.toJson(create));
        final ArrayList arrayList = new ArrayList();
        final List list = (List) Mockito.mock(List.class);
        BeamFnLoggingGrpc.BeamFnLoggingImplBase beamFnLoggingImplBase = new BeamFnLoggingGrpc.BeamFnLoggingImplBase() { // from class: org.apache.beam.fn.harness.FnHarnessTest.1
            public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> streamObserver) {
                List list2 = arrayList;
                TestStreams.Builder withOnNext = TestStreams.withOnNext(list3 -> {
                    list2.addAll(list3.getLogEntriesList());
                });
                Objects.requireNonNull(streamObserver);
                return withOnNext.withOnCompleted(streamObserver::onCompleted).build();
            }
        };
        BeamFnControlGrpc.BeamFnControlImplBase beamFnControlImplBase = new BeamFnControlGrpc.BeamFnControlImplBase() { // from class: org.apache.beam.fn.harness.FnHarnessTest.2
            public StreamObserver<BeamFnApi.InstructionResponse> control(StreamObserver<BeamFnApi.InstructionRequest> streamObserver) {
                CountDownLatch countDownLatch = new CountDownLatch(1);
                create.as(ExecutorOptions.class).getScheduledExecutorService().submit(() -> {
                    streamObserver.onNext(FnHarnessTest.INSTRUCTION_REQUEST);
                    Uninterruptibles.awaitUninterruptibly(countDownLatch);
                    streamObserver.onCompleted();
                });
                List list2 = list;
                TestStreams.Builder withOnNext = TestStreams.withOnNext(instructionResponse -> {
                    list2.add(instructionResponse);
                    countDownLatch.countDown();
                });
                Objects.requireNonNull(countDownLatch);
                return withOnNext.withOnCompleted(countDownLatch::countDown).build();
            }
        };
        Server build = ServerBuilder.forPort(0).addService(beamFnLoggingImplBase).build();
        build.start();
        try {
            build = ServerBuilder.forPort(0).addService(beamFnControlImplBase).build();
            build.start();
            try {
                Endpoints.ApiServiceDescriptor build2 = Endpoints.ApiServiceDescriptor.newBuilder().setUrl("localhost:" + build.getPort()).build();
                Endpoints.ApiServiceDescriptor build3 = Endpoints.ApiServiceDescriptor.newBuilder().setUrl("localhost:" + build.getPort()).build();
                Mockito.when((String) function.apply("LOGGING_API_SERVICE_DESCRIPTOR")).thenReturn(TextFormat.printer().printToString(build2));
                Mockito.when((String) function.apply("CONTROL_API_SERVICE_DESCRIPTOR")).thenReturn(TextFormat.printer().printToString(build3));
                FnHarness.main(function);
                build.shutdownNow();
                build.shutdownNow();
                InOrder inOrder = Mockito.inOrder(onStartupMock, beforeProcessingMock, function, list);
                ((Runnable) inOrder.verify(onStartupMock)).run();
                ((Function) inOrder.verify(function, Mockito.atLeastOnce())).apply((String) Mockito.any());
                ((Consumer) inOrder.verify(beforeProcessingMock)).accept((PipelineOptions) Mockito.any());
                ((List) inOrder.verify(list)).add(INSTRUCTION_RESPONSE);
            } finally {
                build.shutdownNow();
            }
        } catch (Throwable th) {
            throw th;
        }
    }
}
