package org.apache.beam.runners.prism;

import com.google.common.truth.Truth;
import java.io.IOException;
import java.util.Optional;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.util.construction.PipelineTranslation;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.testing.GrpcCleanupRule;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/prism/PrismJobManagerTest.class */
public class PrismJobManagerTest {

    @Rule
    public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();

    @Rule
    public TestName testName = new TestName();

    /* loaded from: input_file:org/apache/beam/runners/prism/PrismJobManagerTest$TestJobService.class */
    private static class TestJobService extends JobServiceGrpc.JobServiceImplBase {
        private Optional<JobApi.PrepareJobResponse> prepareJobResponse;
        private Optional<JobApi.RunJobResponse> runJobResponse;
        private Optional<RuntimeException> error;

        private TestJobService() {
            this.prepareJobResponse = Optional.empty();
            this.runJobResponse = Optional.empty();
            this.error = Optional.empty();
        }

        TestJobService withPrepareJobResponse(JobApi.PrepareJobResponse prepareJobResponse) {
            this.prepareJobResponse = Optional.of(prepareJobResponse);
            return this;
        }

        TestJobService withRunJobResponse(JobApi.RunJobResponse runJobResponse) {
            this.runJobResponse = Optional.of(runJobResponse);
            return this;
        }

        TestJobService withErrorResponse(RuntimeException runtimeException) {
            this.error = Optional.of(runtimeException);
            return this;
        }

        public void prepare(JobApi.PrepareJobRequest prepareJobRequest, StreamObserver<JobApi.PrepareJobResponse> streamObserver) {
            if (this.prepareJobResponse.isPresent()) {
                streamObserver.onNext(this.prepareJobResponse.get());
                streamObserver.onCompleted();
            }
            if (this.error.isPresent()) {
                streamObserver.onError(this.error.get());
            }
        }

        public void run(JobApi.RunJobRequest runJobRequest, StreamObserver<JobApi.RunJobResponse> streamObserver) {
            if (this.runJobResponse.isPresent()) {
                streamObserver.onNext(this.runJobResponse.get());
                streamObserver.onCompleted();
            }
            if (this.error.isPresent()) {
                streamObserver.onError(this.error.get());
            }
        }
    }

    @Test
    public void givenPrepareError_forwardsException_canGracefulShutdown() {
        PrismJobManager prismJobManager = prismJobManager(new TestJobService().withErrorResponse(new RuntimeException(this.testName.getMethodName())));
        Truth.assertThat(Boolean.valueOf(prismJobManager.isShutdown())).isFalse();
        Assert.assertThrows(RuntimeException.class, () -> {
            prismJobManager.prepare(JobApi.PrepareJobRequest.newBuilder().setPipeline(pipelineOf()).build());
        });
        Truth.assertThat(Boolean.valueOf(prismJobManager.isShutdown())).isFalse();
        prismJobManager.close();
        Truth.assertThat(Boolean.valueOf(prismJobManager.isShutdown())).isTrue();
    }

    @Test
    public void givenPrepareSuccess_forwardsResponse_canGracefulShutdown() {
        PrismJobManager prismJobManager = prismJobManager(new TestJobService().withPrepareJobResponse(JobApi.PrepareJobResponse.newBuilder().setStagingSessionToken("token").setPreparationId("preparationId").setArtifactStagingEndpoint(Endpoints.ApiServiceDescriptor.newBuilder().setUrl("localhost:1234").build()).build()));
        Truth.assertThat(Boolean.valueOf(prismJobManager.isShutdown())).isFalse();
        JobApi.PrepareJobResponse prepare = prismJobManager.prepare(JobApi.PrepareJobRequest.newBuilder().setPipeline(pipelineOf()).build());
        Truth.assertThat(Boolean.valueOf(prismJobManager.isShutdown())).isFalse();
        Truth.assertThat(prepare.getStagingSessionToken()).isEqualTo("token");
        Truth.assertThat(prepare.getPreparationId()).isEqualTo("preparationId");
        prismJobManager.close();
        Truth.assertThat(Boolean.valueOf(prismJobManager.isShutdown())).isTrue();
    }

    @Test
    public void givenRunError_forwardsException_canGracefulShutdown() {
        PrismJobManager prismJobManager = prismJobManager(new TestJobService().withErrorResponse(new RuntimeException(this.testName.getMethodName())));
        Truth.assertThat(Boolean.valueOf(prismJobManager.isShutdown())).isFalse();
        Assert.assertThrows(RuntimeException.class, () -> {
            prismJobManager.run(JobApi.RunJobRequest.newBuilder().setPreparationId("prepareId").build());
        });
        Truth.assertThat(Boolean.valueOf(prismJobManager.isShutdown())).isFalse();
        prismJobManager.close();
        Truth.assertThat(Boolean.valueOf(prismJobManager.isShutdown())).isTrue();
    }

    @Test
    public void givenRunSuccess_forwardsResponse_canGracefulShutdown() {
        PrismJobManager prismJobManager = prismJobManager(new TestJobService().withRunJobResponse(JobApi.RunJobResponse.newBuilder().setJobId("jobId").build()));
        Truth.assertThat(Boolean.valueOf(prismJobManager.isShutdown())).isFalse();
        JobApi.RunJobResponse run = prismJobManager.run(JobApi.RunJobRequest.newBuilder().setPreparationId("preparationId").build());
        Truth.assertThat(Boolean.valueOf(prismJobManager.isShutdown())).isFalse();
        Truth.assertThat(run.getJobId()).isEqualTo("jobId");
        prismJobManager.close();
        Truth.assertThat(Boolean.valueOf(prismJobManager.isShutdown())).isTrue();
    }

    @Test
    public void givenTerminalState_closes() {
        PrismJobManager prismJobManager = prismJobManager(new TestJobService());
        Truth.assertThat(Boolean.valueOf(prismJobManager.isShutdown())).isFalse();
        prismJobManager.onStateChanged(PipelineResult.State.RUNNING);
        Truth.assertThat(Boolean.valueOf(prismJobManager.isShutdown())).isFalse();
        prismJobManager.onStateChanged(PipelineResult.State.RUNNING);
        Truth.assertThat(Boolean.valueOf(prismJobManager.isShutdown())).isFalse();
        prismJobManager.onStateChanged(PipelineResult.State.CANCELLED);
        Truth.assertThat(Boolean.valueOf(prismJobManager.isShutdown())).isTrue();
        prismJobManager.close();
    }

    private PrismJobManager prismJobManager(TestJobService testJobService) {
        String generateName = InProcessServerBuilder.generateName();
        try {
            this.grpcCleanup.register(InProcessServerBuilder.forName(generateName).directExecutor().addService(testJobService).build().start());
            return PrismJobManager.builder().setTimeout(Duration.millis(3000L)).setEndpoint("ignore").setManagedChannel(this.grpcCleanup.register(InProcessChannelBuilder.forName(generateName).build())).build();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static RunnerApi.Pipeline pipelineOf() {
        Pipeline create = Pipeline.create();
        create.apply(Impulse.create());
        return PipelineTranslation.toProto(create);
    }
}
