package org.apache.beam.runners.jobsubmission;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.hamcrest.core.IsSame;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/jobsubmission/JobInvocationTest.class */
public class JobInvocationTest {
    private static ExecutorService executorService;
    private JobInvocation jobInvocation;
    private ControllablePipelineRunner runner;

    /* loaded from: input_file:org/apache/beam/runners/jobsubmission/JobInvocationTest$ControllablePipelineRunner.class */
    private static class ControllablePipelineRunner implements PortablePipelineRunner {
        private final CountDownLatch latch;
        private volatile PortablePipelineResult result;

        private ControllablePipelineRunner() {
            this.latch = new CountDownLatch(1);
        }

        public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) throws Exception {
            this.latch.await();
            return this.result;
        }

        void setResult(PortablePipelineResult portablePipelineResult) {
            this.result = portablePipelineResult;
            this.latch.countDown();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/jobsubmission/JobInvocationTest$TestPipelineResult.class */
    private static class TestPipelineResult implements PortablePipelineResult {
        private final PipelineResult.State state;
        private final CountDownLatch cancelLatch;
        private JobApi.MetricResults monitoringInfos;

        private TestPipelineResult(PipelineResult.State state, JobApi.MetricResults metricResults) {
            this.cancelLatch = new CountDownLatch(1);
            this.state = state;
            this.monitoringInfos = metricResults;
        }

        private TestPipelineResult(PipelineResult.State state) {
            this(state, JobApi.MetricResults.newBuilder().build());
        }

        public PipelineResult.State getState() {
            return this.state;
        }

        public PipelineResult.State cancel() {
            this.cancelLatch.countDown();
            return PipelineResult.State.CANCELLED;
        }

        public PipelineResult.State waitUntilFinish(Duration duration) {
            return null;
        }

        public PipelineResult.State waitUntilFinish() {
            return null;
        }

        public MetricResults metrics() {
            return null;
        }

        public JobApi.MetricResults portableMetrics() {
            return this.monitoringInfos;
        }
    }

    @Before
    public void setup() {
        executorService = Executors.newFixedThreadPool(1);
        JobInfo create = JobInfo.create("jobid", "jobName", "retrievalToken", Struct.getDefaultInstance());
        ListeningExecutorService listeningDecorator = MoreExecutors.listeningDecorator(executorService);
        Pipeline create2 = Pipeline.create();
        this.runner = new ControllablePipelineRunner();
        this.jobInvocation = new JobInvocation(create, listeningDecorator, PipelineTranslation.toProto(create2), this.runner);
    }

    @After
    public void tearDown() throws Exception {
        executorService.shutdownNow();
        executorService = null;
    }

    @Test(timeout = 10000)
    public void testStateAfterCompletion() throws Exception {
        this.jobInvocation.start();
        MatcherAssert.assertThat(this.jobInvocation.getState(), Is.is(JobApi.JobState.Enum.RUNNING));
        this.runner.setResult(new TestPipelineResult(PipelineResult.State.DONE));
        awaitJobState(this.jobInvocation, JobApi.JobState.Enum.DONE);
    }

    @Test(timeout = 10000)
    public void testStateAfterCompletionWithoutResult() throws Exception {
        this.jobInvocation.start();
        MatcherAssert.assertThat(this.jobInvocation.getState(), Is.is(JobApi.JobState.Enum.RUNNING));
        this.runner.setResult(null);
        awaitJobState(this.jobInvocation, JobApi.JobState.Enum.UNSPECIFIED);
    }

    @Test(timeout = 10000)
    public void testStateAfterCancellation() throws Exception {
        this.jobInvocation.start();
        MatcherAssert.assertThat(this.jobInvocation.getState(), Is.is(JobApi.JobState.Enum.RUNNING));
        this.jobInvocation.cancel();
        awaitJobState(this.jobInvocation, JobApi.JobState.Enum.CANCELLED);
    }

    @Test(timeout = 10000)
    public void testStateAfterCancellationWithPipelineResult() throws Exception {
        this.jobInvocation.start();
        MatcherAssert.assertThat(this.jobInvocation.getState(), Is.is(JobApi.JobState.Enum.RUNNING));
        TestPipelineResult testPipelineResult = new TestPipelineResult(PipelineResult.State.FAILED);
        this.runner.setResult(testPipelineResult);
        awaitJobState(this.jobInvocation, JobApi.JobState.Enum.FAILED);
        this.jobInvocation.cancel();
        testPipelineResult.cancelLatch.await();
    }

    @Test(timeout = 10000)
    public void testNoCancellationWhenDone() throws Exception {
        this.jobInvocation.start();
        MatcherAssert.assertThat(this.jobInvocation.getState(), Is.is(JobApi.JobState.Enum.RUNNING));
        TestPipelineResult testPipelineResult = new TestPipelineResult(PipelineResult.State.DONE);
        this.runner.setResult(testPipelineResult);
        awaitJobState(this.jobInvocation, JobApi.JobState.Enum.DONE);
        this.jobInvocation.cancel();
        MatcherAssert.assertThat(this.jobInvocation.getState(), Is.is(JobApi.JobState.Enum.DONE));
        MatcherAssert.assertThat(Long.valueOf(testPipelineResult.cancelLatch.getCount()), Is.is(1L));
    }

    @Test(timeout = 10000)
    public void testReturnsMetricsFromJobInvocationAfterSuccess() throws Exception {
        TestPipelineResult testPipelineResult = new TestPipelineResult(PipelineResult.State.DONE, JobApi.MetricResults.newBuilder().build());
        this.jobInvocation.start();
        this.runner.setResult(testPipelineResult);
        awaitJobState(this.jobInvocation, JobApi.JobState.Enum.DONE);
        MatcherAssert.assertThat(this.jobInvocation.getMetrics(), Matchers.allOf(Is.is(Matchers.notNullValue()), Is.is(IsSame.sameInstance(testPipelineResult.portableMetrics()))));
    }

    private static void awaitJobState(JobInvocation jobInvocation, JobApi.JobState.Enum r4) throws Exception {
        while (jobInvocation.getState() != r4) {
            Thread.sleep(50L);
        }
    }
}
