package org.apache.beam.runners.direct.portable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.direct.portable.CommittedResult;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/portable/DirectTransformExecutorTest.class */
public class DirectTransformExecutorTest {
    private CountDownLatch evaluatorCompleted;
    private RegisteringCompletionCallback completionCallback;
    private TransformExecutorService transformEvaluationState;
    private BundleFactory bundleFactory;

    @Mock
    private DirectMetrics metrics;

    @Mock
    private EvaluationContext evaluationContext;

    @Mock
    private TransformEvaluatorRegistry registry;

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private final PipelineNode.PCollectionNode created = PipelineNode.pCollection("created", RunnerApi.PCollection.newBuilder().setUniqueName("created").build());
    private final PipelineNode.PTransformNode createdProducer = PipelineNode.pTransform("create", RunnerApi.PTransform.newBuilder().putOutputs("created", "created").setUniqueName("create").build());
    private final PipelineNode.PTransformNode downstreamProducer = PipelineNode.pTransform("downstream", RunnerApi.PTransform.newBuilder().putInputs("input", "created").setUniqueName("create").build());

    @Rule
    public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);

    /* loaded from: input_file:org/apache/beam/runners/direct/portable/DirectTransformExecutorTest$RegisteringCompletionCallback.class */
    private static class RegisteringCompletionCallback implements CompletionCallback {
        private TransformResult<?> handledResult;
        private boolean handledEmpty;
        private Exception handledException;
        private final CountDownLatch onMethod;

        private RegisteringCompletionCallback(CountDownLatch countDownLatch) {
            this.handledResult = null;
            this.handledEmpty = false;
            this.handledException = null;
            this.onMethod = countDownLatch;
        }

        public CommittedResult handleResult(CommittedBundle<?> committedBundle, TransformResult<?> transformResult) {
            this.handledResult = transformResult;
            this.onMethod.countDown();
            Iterable emptyList = transformResult.getUnprocessedElements() == null ? Collections.emptyList() : transformResult.getUnprocessedElements();
            return CommittedResult.create(transformResult, (committedBundle == null || Iterables.isEmpty(emptyList)) ? Optional.absent() : Optional.of(committedBundle.withElements(emptyList)), Collections.emptyList(), EnumSet.noneOf(CommittedResult.OutputType.class));
        }

        public void handleEmpty(PipelineNode.PTransformNode pTransformNode) {
            this.handledEmpty = true;
            this.onMethod.countDown();
        }

        public void handleException(CommittedBundle<?> committedBundle, Exception exc) {
            this.handledException = exc;
            this.onMethod.countDown();
        }

        public void handleError(Error error) {
            throw error;
        }
    }

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        this.bundleFactory = ImmutableListBundleFactory.create();
        this.transformEvaluationState = TransformExecutorServices.parallel(MoreExecutors.newDirectExecutorService());
        this.evaluatorCompleted = new CountDownLatch(1);
        this.completionCallback = new RegisteringCompletionCallback(this.evaluatorCompleted);
        PipelineNode.pCollection("created", RunnerApi.PCollection.newBuilder().setUniqueName("created").build());
        Mockito.when(this.evaluationContext.getMetrics()).thenReturn(this.metrics);
    }

    @Test
    public void callWithNullInputBundleFinishesBundleAndCompletes() throws Exception {
        final StepTransformResult build = StepTransformResult.withoutHold(this.createdProducer).build();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Mockito.when(this.registry.forApplication(this.createdProducer, (CommittedBundle) null)).thenReturn(new TransformEvaluator<Object>() { // from class: org.apache.beam.runners.direct.portable.DirectTransformExecutorTest.1
            public void processElement(WindowedValue<Object> windowedValue) throws Exception {
                throw new IllegalArgumentException("Shouldn't be called");
            }

            public TransformResult<Object> finishBundle() throws Exception {
                atomicBoolean.set(true);
                return build;
            }
        });
        new DirectTransformExecutor(this.evaluationContext, this.registry, (CommittedBundle) null, this.createdProducer, this.completionCallback, this.transformEvaluationState).run();
        Assert.assertThat(Boolean.valueOf(atomicBoolean.get()), Matchers.is(true));
        Assert.assertThat(this.completionCallback.handledResult, Matchers.equalTo(build));
        Assert.assertThat(this.completionCallback.handledException, Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void nullTransformEvaluatorTerminates() throws Exception {
        Mockito.when(this.registry.forApplication(this.createdProducer, (CommittedBundle) null)).thenReturn((Object) null);
        new DirectTransformExecutor(this.evaluationContext, this.registry, (CommittedBundle) null, this.createdProducer, this.completionCallback, this.transformEvaluationState).run();
        Assert.assertThat(this.completionCallback.handledResult, Matchers.is(Matchers.nullValue()));
        Assert.assertThat(Boolean.valueOf(this.completionCallback.handledEmpty), Matchers.equalTo(true));
        Assert.assertThat(this.completionCallback.handledException, Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void inputBundleProcessesEachElementFinishesAndCompletes() throws Exception {
        final StepTransformResult build = StepTransformResult.withoutHold(this.downstreamProducer).build();
        final ArrayList arrayList = new ArrayList();
        TransformEvaluator<String> transformEvaluator = new TransformEvaluator<String>() { // from class: org.apache.beam.runners.direct.portable.DirectTransformExecutorTest.2
            public void processElement(WindowedValue<String> windowedValue) throws Exception {
                arrayList.add(windowedValue);
            }

            public TransformResult<String> finishBundle() throws Exception {
                return build;
            }
        };
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow("foo");
        WindowedValue valueInGlobalWindow2 = WindowedValue.valueInGlobalWindow("spam");
        WindowedValue valueInGlobalWindow3 = WindowedValue.valueInGlobalWindow("third");
        CommittedBundle commit = this.bundleFactory.createBundle(this.created).add(valueInGlobalWindow).add(valueInGlobalWindow2).add(valueInGlobalWindow3).commit(Instant.now());
        Mockito.when(this.registry.forApplication(this.downstreamProducer, commit)).thenReturn(transformEvaluator);
        Future<?> submit = Executors.newSingleThreadExecutor().submit(new DirectTransformExecutor(this.evaluationContext, this.registry, commit, this.downstreamProducer, this.completionCallback, this.transformEvaluationState));
        this.evaluatorCompleted.await();
        submit.get();
        Assert.assertThat(arrayList, Matchers.containsInAnyOrder(new WindowedValue[]{valueInGlobalWindow2, valueInGlobalWindow3, valueInGlobalWindow}));
        Assert.assertThat(this.completionCallback.handledResult, Matchers.equalTo(build));
        Assert.assertThat(this.completionCallback.handledException, Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void processElementThrowsExceptionCallsback() throws Exception {
        final StepTransformResult build = StepTransformResult.withoutHold(this.downstreamProducer).build();
        final Exception exc = new Exception();
        TransformEvaluator<String> transformEvaluator = new TransformEvaluator<String>() { // from class: org.apache.beam.runners.direct.portable.DirectTransformExecutorTest.3
            public void processElement(WindowedValue<String> windowedValue) throws Exception {
                throw exc;
            }

            public TransformResult<String> finishBundle() throws Exception {
                return build;
            }
        };
        CommittedBundle commit = this.bundleFactory.createBundle(this.created).add(WindowedValue.valueInGlobalWindow("foo")).commit(Instant.now());
        Mockito.when(this.registry.forApplication(this.downstreamProducer, commit)).thenReturn(transformEvaluator);
        Executors.newSingleThreadExecutor().submit(new DirectTransformExecutor(this.evaluationContext, this.registry, commit, this.downstreamProducer, this.completionCallback, this.transformEvaluationState));
        this.evaluatorCompleted.await();
        Assert.assertThat(this.completionCallback.handledResult, Matchers.is(Matchers.nullValue()));
        Assert.assertThat(this.completionCallback.handledException, Matchers.equalTo(exc));
    }

    @Test
    public void finishBundleThrowsExceptionCallsback() throws Exception {
        final Exception exc = new Exception();
        TransformEvaluator<String> transformEvaluator = new TransformEvaluator<String>() { // from class: org.apache.beam.runners.direct.portable.DirectTransformExecutorTest.4
            public void processElement(WindowedValue<String> windowedValue) throws Exception {
            }

            public TransformResult<String> finishBundle() throws Exception {
                throw exc;
            }
        };
        CommittedBundle commit = this.bundleFactory.createBundle(this.created).commit(Instant.now());
        Mockito.when(this.registry.forApplication(this.downstreamProducer, commit)).thenReturn(transformEvaluator);
        Executors.newSingleThreadExecutor().submit(new DirectTransformExecutor(this.evaluationContext, this.registry, commit, this.downstreamProducer, this.completionCallback, this.transformEvaluationState));
        this.evaluatorCompleted.await();
        Assert.assertThat(this.completionCallback.handledResult, Matchers.is(Matchers.nullValue()));
        Assert.assertThat(this.completionCallback.handledException, Matchers.equalTo(exc));
    }
}
