/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.jobsubmission;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineResult;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.hamcrest.core.IsSame;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class JobInvocationTest {
    private static ExecutorService executorService;
    private JobInvocation jobInvocation;
    private ControllablePipelineRunner runner;

    @Before
    public void setup() {
        executorService = Executors.newFixedThreadPool(1);
        JobInfo jobInfo = JobInfo.create((String)"jobid", (String)"jobName", (String)"retrievalToken", (Struct)Struct.getDefaultInstance());
        ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator((ExecutorService)executorService);
        Pipeline pipeline = Pipeline.create();
        this.runner = new ControllablePipelineRunner();
        this.jobInvocation = new JobInvocation(jobInfo, listeningExecutorService, PipelineTranslation.toProto((Pipeline)pipeline), (PortablePipelineRunner)this.runner);
    }

    @After
    public void tearDown() throws Exception {
        executorService.shutdownNow();
        executorService = null;
    }

    @Test(timeout=10000L)
    public void testStateAfterCompletion() throws Exception {
        this.jobInvocation.start();
        MatcherAssert.assertThat((Object)this.jobInvocation.getState(), (Matcher)Is.is((Object)JobApi.JobState.Enum.RUNNING));
        TestPipelineResult pipelineResult = new TestPipelineResult(PipelineResult.State.DONE);
        this.runner.setResult(pipelineResult);
        JobInvocationTest.awaitJobState(this.jobInvocation, JobApi.JobState.Enum.DONE);
    }

    @Test(timeout=10000L)
    public void testStateAfterCompletionWithoutResult() throws Exception {
        this.jobInvocation.start();
        MatcherAssert.assertThat((Object)this.jobInvocation.getState(), (Matcher)Is.is((Object)JobApi.JobState.Enum.RUNNING));
        this.runner.setResult(null);
        JobInvocationTest.awaitJobState(this.jobInvocation, JobApi.JobState.Enum.UNSPECIFIED);
    }

    @Test(timeout=10000L)
    public void testStateAfterCancellation() throws Exception {
        this.jobInvocation.start();
        MatcherAssert.assertThat((Object)this.jobInvocation.getState(), (Matcher)Is.is((Object)JobApi.JobState.Enum.RUNNING));
        this.jobInvocation.cancel();
        JobInvocationTest.awaitJobState(this.jobInvocation, JobApi.JobState.Enum.CANCELLED);
    }

    @Test(timeout=10000L)
    public void testStateAfterCancellationWithPipelineResult() throws Exception {
        this.jobInvocation.start();
        MatcherAssert.assertThat((Object)this.jobInvocation.getState(), (Matcher)Is.is((Object)JobApi.JobState.Enum.RUNNING));
        TestPipelineResult pipelineResult = new TestPipelineResult(PipelineResult.State.FAILED);
        this.runner.setResult(pipelineResult);
        JobInvocationTest.awaitJobState(this.jobInvocation, JobApi.JobState.Enum.FAILED);
        this.jobInvocation.cancel();
        pipelineResult.cancelLatch.await();
    }

    @Test(timeout=10000L)
    public void testNoCancellationWhenDone() throws Exception {
        this.jobInvocation.start();
        MatcherAssert.assertThat((Object)this.jobInvocation.getState(), (Matcher)Is.is((Object)JobApi.JobState.Enum.RUNNING));
        TestPipelineResult pipelineResult = new TestPipelineResult(PipelineResult.State.DONE);
        this.runner.setResult(pipelineResult);
        JobInvocationTest.awaitJobState(this.jobInvocation, JobApi.JobState.Enum.DONE);
        this.jobInvocation.cancel();
        MatcherAssert.assertThat((Object)this.jobInvocation.getState(), (Matcher)Is.is((Object)JobApi.JobState.Enum.DONE));
        MatcherAssert.assertThat((Object)pipelineResult.cancelLatch.getCount(), (Matcher)Is.is((Object)1L));
    }

    @Test(timeout=10000L)
    public void testReturnsMetricsFromJobInvocationAfterSuccess() throws Exception {
        JobApi.MetricResults expectedMonitoringInfos = JobApi.MetricResults.newBuilder().build();
        TestPipelineResult result = new TestPipelineResult(PipelineResult.State.DONE, expectedMonitoringInfos);
        this.jobInvocation.start();
        this.runner.setResult(result);
        JobInvocationTest.awaitJobState(this.jobInvocation, JobApi.JobState.Enum.DONE);
        MatcherAssert.assertThat((Object)this.jobInvocation.getMetrics(), (Matcher)Matchers.allOf((Matcher)Is.is((Matcher)Matchers.notNullValue()), (Matcher)Is.is((Matcher)IsSame.sameInstance((Object)result.portableMetrics()))));
    }

    private static void awaitJobState(JobInvocation jobInvocation, JobApi.JobState.Enum jobState) throws Exception {
        while (jobInvocation.getState() != jobState) {
            Thread.sleep(50L);
        }
    }

    private static class TestPipelineResult
    implements PortablePipelineResult {
        private final PipelineResult.State state;
        private final CountDownLatch cancelLatch = new CountDownLatch(1);
        private JobApi.MetricResults monitoringInfos;

        private TestPipelineResult(PipelineResult.State state, JobApi.MetricResults monitoringInfos) {
            this.state = state;
            this.monitoringInfos = monitoringInfos;
        }

        private TestPipelineResult(PipelineResult.State state) {
            this(state, JobApi.MetricResults.newBuilder().build());
        }

        public PipelineResult.State getState() {
            return this.state;
        }

        public PipelineResult.State cancel() {
            this.cancelLatch.countDown();
            return PipelineResult.State.CANCELLED;
        }

        public PipelineResult.State waitUntilFinish(Duration duration) {
            return null;
        }

        public PipelineResult.State waitUntilFinish() {
            return null;
        }

        public MetricResults metrics() {
            return null;
        }

        public JobApi.MetricResults portableMetrics() {
            return this.monitoringInfos;
        }
    }

    private static class ControllablePipelineRunner
    implements PortablePipelineRunner {
        private final CountDownLatch latch = new CountDownLatch(1);
        private volatile PortablePipelineResult result;

        private ControllablePipelineRunner() {
        }

        public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) throws Exception {
            this.latch.await();
            return this.result;
        }

        void setResult(PortablePipelineResult pipelineResult) {
            this.result = pipelineResult;
            this.latch.countDown();
        }
    }
}

