package org.apache.beam.runners.jobsubmission;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.function.ThrowingConsumer;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusException;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.hamcrest.MatcherAssert;
import org.hamcrest.collection.IsCollectionWithSize;
import org.hamcrest.core.Is;
import org.hamcrest.core.IsNull;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/jobsubmission/InMemoryJobServiceTest.class */
public class InMemoryJobServiceTest {
    private static final String TEST_RETRIEVAL_TOKEN = "test-staging-token";
    private final int maxInvocationHistory = 3;
    Endpoints.ApiServiceDescriptor stagingServiceDescriptor;

    @Mock
    JobInvoker invoker;

    @Mock
    JobInvocation invocation;

    @Mock
    ArtifactStagingService stagingService;

    @Mock
    GrpcFnServer<ArtifactStagingService> stagingServer;
    InMemoryJobService service;
    private static final RunnerApi.Pipeline TEST_PIPELINE = RunnerApi.Pipeline.newBuilder().setComponents(RunnerApi.Components.getDefaultInstance()).build();
    private static final Struct TEST_OPTIONS = Struct.getDefaultInstance();
    private static final String TEST_JOB_ID = "test-job-id";
    private static final String TEST_JOB_NAME = "test-job";
    private static final JobApi.JobInfo TEST_JOB_INFO = JobApi.JobInfo.newBuilder().setJobId(TEST_JOB_ID).setJobName(TEST_JOB_NAME).setPipelineOptions(TEST_OPTIONS).build();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/jobsubmission/InMemoryJobServiceTest$Job.class */
    public static class Job {
        private Consumer<JobApi.JobStateEvent> stateListener;

        private Job(Consumer<JobApi.JobStateEvent> consumer) {
            this.stateListener = consumer;
        }

        void finish() {
            this.stateListener.accept(JobApi.JobStateEvent.newBuilder().setState(JobApi.JobState.Enum.DONE).build());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/jobsubmission/InMemoryJobServiceTest$RecordingObserver.class */
    public static class RecordingObserver<T> implements StreamObserver<T> {
        ArrayList<T> values;
        Throwable error;
        boolean isCompleted;
        static final /* synthetic */ boolean $assertionsDisabled;

        private RecordingObserver() {
            this.values = new ArrayList<>();
            this.error = null;
            this.isCompleted = false;
        }

        public void onNext(T t) {
            this.values.add(t);
        }

        public void onError(Throwable th) {
            this.error = th;
        }

        public void onCompleted() {
            this.isCompleted = true;
        }

        T getValue() {
            if ($assertionsDisabled || this.values.size() == 1) {
                return this.values.get(0);
            }
            throw new AssertionError();
        }

        boolean isSuccessful() {
            return this.isCompleted && this.error == null;
        }

        static {
            $assertionsDisabled = !InMemoryJobServiceTest.class.desiredAssertionStatus();
        }
    }

    @Before
    public void setUp() throws Exception {
        MockitoAnnotations.initMocks(this);
        this.stagingServiceDescriptor = Endpoints.ApiServiceDescriptor.getDefaultInstance();
        Mockito.when(this.stagingServer.getApiServiceDescriptor()).thenReturn(this.stagingServiceDescriptor);
        Mockito.when(this.stagingServer.getService()).thenReturn(this.stagingService);
        Mockito.when(this.stagingService.getStagedArtifacts((String) Matchers.any())).thenReturn(ImmutableMap.of());
        this.service = InMemoryJobService.create(this.stagingServer, str -> {
            return "token";
        }, (ThrowingConsumer) null, this.invoker);
        Mockito.when(this.invoker.invoke(TEST_PIPELINE, TEST_OPTIONS, TEST_RETRIEVAL_TOKEN)).thenReturn(this.invocation);
        Mockito.when(this.invocation.getId()).thenReturn(TEST_JOB_ID);
        Mockito.when(this.invocation.getPipeline()).thenReturn(TEST_PIPELINE);
        Mockito.when(this.invocation.toProto()).thenReturn(TEST_JOB_INFO);
    }

    private JobApi.PrepareJobResponse prepareJob() {
        JobApi.PrepareJobRequest build = JobApi.PrepareJobRequest.newBuilder().setJobName(TEST_JOB_NAME).setPipeline(RunnerApi.Pipeline.getDefaultInstance()).setPipelineOptions(Struct.getDefaultInstance()).build();
        RecordingObserver recordingObserver = new RecordingObserver();
        this.service.prepare(build, recordingObserver);
        return (JobApi.PrepareJobResponse) recordingObserver.values.get(0);
    }

    private JobApi.RunJobResponse runJob(String str) {
        JobApi.RunJobRequest build = JobApi.RunJobRequest.newBuilder().setPreparationId(str).setRetrievalToken(TEST_RETRIEVAL_TOKEN).build();
        RecordingObserver recordingObserver = new RecordingObserver();
        this.service.run(build, recordingObserver);
        return (JobApi.RunJobResponse) recordingObserver.values.get(0);
    }

    private JobApi.RunJobResponse prepareAndRunJob() {
        return runJob(prepareJob().getPreparationId());
    }

    @Test
    public void testPrepareIsSuccessful() {
        JobApi.PrepareJobRequest build = JobApi.PrepareJobRequest.newBuilder().setJobName(TEST_JOB_NAME).setPipeline(RunnerApi.Pipeline.getDefaultInstance()).setPipelineOptions(Struct.getDefaultInstance()).build();
        RecordingObserver recordingObserver = new RecordingObserver();
        this.service.prepare(build, recordingObserver);
        MatcherAssert.assertThat(Boolean.valueOf(recordingObserver.isSuccessful()), Is.is(true));
        MatcherAssert.assertThat(recordingObserver.values, IsCollectionWithSize.hasSize(1));
        JobApi.PrepareJobResponse prepareJobResponse = (JobApi.PrepareJobResponse) recordingObserver.values.get(0);
        MatcherAssert.assertThat(prepareJobResponse.getArtifactStagingEndpoint(), IsNull.notNullValue());
        MatcherAssert.assertThat(prepareJobResponse.getPreparationId(), IsNull.notNullValue());
    }

    @Test
    public void testGetJobsIsSuccessful() throws Exception {
        prepareAndRunJob();
        JobApi.GetJobsRequest build = JobApi.GetJobsRequest.newBuilder().build();
        RecordingObserver recordingObserver = new RecordingObserver();
        this.service.getJobs(build, recordingObserver);
        MatcherAssert.assertThat(Boolean.valueOf(recordingObserver.isSuccessful()), Is.is(true));
        MatcherAssert.assertThat(recordingObserver.values, IsCollectionWithSize.hasSize(1));
        List jobInfoList = ((JobApi.GetJobsResponse) recordingObserver.values.get(0)).getJobInfoList();
        MatcherAssert.assertThat(jobInfoList, IsCollectionWithSize.hasSize(1));
        JobApi.JobInfo jobInfo = (JobApi.JobInfo) jobInfoList.get(0);
        MatcherAssert.assertThat(jobInfo.getJobId(), Is.is(TEST_JOB_ID));
        MatcherAssert.assertThat(jobInfo.getJobName(), Is.is(TEST_JOB_NAME));
    }

    @Test
    public void testGetPipelineFailure() {
        prepareJob();
        JobApi.GetJobPipelineRequest build = JobApi.GetJobPipelineRequest.newBuilder().setJobId(TEST_JOB_ID).build();
        RecordingObserver recordingObserver = new RecordingObserver();
        this.service.getPipeline(build, recordingObserver);
        MatcherAssert.assertThat(Boolean.valueOf(recordingObserver.isSuccessful()), Is.is(false));
        MatcherAssert.assertThat(recordingObserver.error, Is.isA(StatusException.class));
    }

    @Test
    public void testGetPipelineIsSuccessful() throws Exception {
        prepareAndRunJob();
        JobApi.GetJobPipelineRequest build = JobApi.GetJobPipelineRequest.newBuilder().setJobId(TEST_JOB_ID).build();
        RecordingObserver recordingObserver = new RecordingObserver();
        this.service.getPipeline(build, recordingObserver);
        MatcherAssert.assertThat(Boolean.valueOf(recordingObserver.isSuccessful()), Is.is(true));
        MatcherAssert.assertThat(recordingObserver.values, IsCollectionWithSize.hasSize(1));
        MatcherAssert.assertThat(((JobApi.GetJobPipelineResponse) recordingObserver.values.get(0)).getPipeline(), Is.is(TEST_PIPELINE));
    }

    @Test
    public void testJobSubmissionUsesJobInvokerAndIsSuccess() throws Exception {
        JobApi.RunJobRequest build = JobApi.RunJobRequest.newBuilder().setPreparationId(prepareJob().getPreparationId()).setRetrievalToken(TEST_RETRIEVAL_TOKEN).build();
        RecordingObserver recordingObserver = new RecordingObserver();
        this.service.run(build, recordingObserver);
        ((JobInvoker) Mockito.verify(this.invoker, Mockito.times(1))).invoke(TEST_PIPELINE, TEST_OPTIONS, TEST_RETRIEVAL_TOKEN);
        MatcherAssert.assertThat(Boolean.valueOf(recordingObserver.isSuccessful()), Is.is(true));
        MatcherAssert.assertThat(recordingObserver.values, IsCollectionWithSize.hasSize(1));
        MatcherAssert.assertThat(((JobApi.RunJobResponse) recordingObserver.values.get(0)).getJobId(), Is.is(TEST_JOB_ID));
        ((JobInvocation) Mockito.verify(this.invocation, Mockito.times(1))).addStateListener((Consumer) Matchers.any());
        ((JobInvocation) Mockito.verify(this.invocation, Mockito.times(1))).start();
    }

    @Test
    public void testInvocationCleanup() {
        this.service = InMemoryJobService.create(this.stagingServer, str -> {
            return "token";
        }, (ThrowingConsumer) null, this.invoker, 3);
        MatcherAssert.assertThat(Integer.valueOf(getNumberOfInvocations()), Is.is(0));
        Job runJob = runJob();
        MatcherAssert.assertThat(Integer.valueOf(getNumberOfInvocations()), Is.is(1));
        Job runJob2 = runJob();
        MatcherAssert.assertThat(Integer.valueOf(getNumberOfInvocations()), Is.is(2));
        Job runJob3 = runJob();
        MatcherAssert.assertThat(Integer.valueOf(getNumberOfInvocations()), Is.is(3));
        Job runJob4 = runJob();
        MatcherAssert.assertThat(Integer.valueOf(getNumberOfInvocations()), Is.is(4));
        runJob.finish();
        MatcherAssert.assertThat(Integer.valueOf(getNumberOfInvocations()), Is.is(4));
        runJob2.finish();
        MatcherAssert.assertThat(Integer.valueOf(getNumberOfInvocations()), Is.is(4));
        runJob3.finish();
        MatcherAssert.assertThat(Integer.valueOf(getNumberOfInvocations()), Is.is(4));
        runJob4.finish();
        MatcherAssert.assertThat(Integer.valueOf(getNumberOfInvocations()), Is.is(3));
        Job runJob5 = runJob();
        MatcherAssert.assertThat(Integer.valueOf(getNumberOfInvocations()), Is.is(4));
        runJob5.finish();
        MatcherAssert.assertThat(Integer.valueOf(getNumberOfInvocations()), Is.is(3));
    }

    private Job runJob() {
        Mockito.when(this.invocation.getId()).thenReturn(UUID.randomUUID().toString());
        prepareAndRunJob();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Consumer.class);
        ((JobInvocation) Mockito.verify(this.invocation, Mockito.atLeastOnce())).addStateListener((Consumer) forClass.capture());
        return new Job((Consumer) forClass.getValue());
    }

    private int getNumberOfInvocations() {
        RecordingObserver recordingObserver = new RecordingObserver();
        this.service.getJobs(JobApi.GetJobsRequest.newBuilder().build(), recordingObserver);
        return ((JobApi.GetJobsResponse) recordingObserver.getValue()).getJobInfoCount();
    }
}
