package org.apache.beam.runners.direct;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.runners.direct.InProcessPipelineRunner;
import org.apache.beam.runners.direct.repackaged.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.util.IllegalMutationException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/TransformExecutorTest.class */
public class TransformExecutorTest {

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private PCollection<String> created;
    private PCollection<KV<Integer, String>> downstream;
    private CountDownLatch evaluatorCompleted;
    private RegisteringCompletionCallback completionCallback;
    private TransformExecutorService transformEvaluationState;
    private BundleFactory bundleFactory;

    @Mock
    private InProcessEvaluationContext evaluationContext;

    @Mock
    private TransformEvaluatorRegistry registry;

    /* loaded from: input_file:org/apache/beam/runners/direct/TransformExecutorTest$RegisteringCompletionCallback.class */
    private static class RegisteringCompletionCallback implements CompletionCallback {
        private InProcessTransformResult handledResult;
        private Throwable handledThrowable;
        private final CountDownLatch onMethod;

        private RegisteringCompletionCallback(CountDownLatch countDownLatch) {
            this.handledResult = null;
            this.handledThrowable = null;
            this.onMethod = countDownLatch;
        }

        public CommittedResult handleResult(InProcessPipelineRunner.CommittedBundle<?> committedBundle, InProcessTransformResult inProcessTransformResult) {
            this.handledResult = inProcessTransformResult;
            this.onMethod.countDown();
            return CommittedResult.create(inProcessTransformResult, committedBundle == null ? null : committedBundle.withElements(inProcessTransformResult.getUnprocessedElements() == null ? Collections.emptyList() : inProcessTransformResult.getUnprocessedElements()), Collections.emptyList());
        }

        public void handleThrowable(InProcessPipelineRunner.CommittedBundle<?> committedBundle, Throwable th) {
            this.handledThrowable = th;
            this.onMethod.countDown();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/TransformExecutorTest$TestEnforcement.class */
    public static class TestEnforcement<T> implements ModelEnforcement<T> {
        private final List<WindowedValue<T>> beforeElements;
        private final List<WindowedValue<T>> afterElements;
        private final List<InProcessTransformResult> finishedBundles;

        private TestEnforcement() {
            this.beforeElements = new ArrayList();
            this.afterElements = new ArrayList();
            this.finishedBundles = new ArrayList();
        }

        public void beforeElement(WindowedValue<T> windowedValue) {
            this.beforeElements.add(windowedValue);
        }

        public void afterElement(WindowedValue<T> windowedValue) {
            this.afterElements.add(windowedValue);
        }

        public void afterFinish(InProcessPipelineRunner.CommittedBundle<T> committedBundle, InProcessTransformResult inProcessTransformResult, Iterable<? extends InProcessPipelineRunner.CommittedBundle<?>> iterable) {
            this.finishedBundles.add(inProcessTransformResult);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/TransformExecutorTest$TestEnforcementFactory.class */
    private static class TestEnforcementFactory implements ModelEnforcementFactory {
        private TestEnforcement<?> instance;

        private TestEnforcementFactory() {
        }

        public <T> TestEnforcement<T> forBundle(InProcessPipelineRunner.CommittedBundle<T> committedBundle, AppliedPTransform<?, ?, ?> appliedPTransform) {
            TestEnforcement testEnforcement = (TestEnforcement<T>) new TestEnforcement();
            this.instance = testEnforcement;
            return testEnforcement;
        }

        /* renamed from: forBundle, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ ModelEnforcement m10forBundle(InProcessPipelineRunner.CommittedBundle committedBundle, AppliedPTransform appliedPTransform) {
            return forBundle(committedBundle, (AppliedPTransform<?, ?, ?>) appliedPTransform);
        }
    }

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        this.bundleFactory = InProcessBundleFactory.create();
        this.transformEvaluationState = TransformExecutorServices.parallel(MoreExecutors.newDirectExecutorService());
        this.evaluatorCompleted = new CountDownLatch(1);
        this.completionCallback = new RegisteringCompletionCallback(this.evaluatorCompleted);
        this.created = TestPipeline.create().apply(Create.of(new String[]{"foo", "spam", "third"}));
        this.downstream = this.created.apply(WithKeys.of(3));
    }

    @Test
    public void callWithNullInputBundleFinishesBundleAndCompletes() throws Exception {
        final StepTransformResult build = StepTransformResult.withoutHold(this.created.getProducingTransformInternal()).build();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Mockito.when(this.registry.forApplication(this.created.getProducingTransformInternal(), (InProcessPipelineRunner.CommittedBundle) null, this.evaluationContext)).thenReturn(new TransformEvaluator<Object>() { // from class: org.apache.beam.runners.direct.TransformExecutorTest.1
            public void processElement(WindowedValue<Object> windowedValue) throws Exception {
                throw new IllegalArgumentException("Shouldn't be called");
            }

            public InProcessTransformResult finishBundle() throws Exception {
                atomicBoolean.set(true);
                return build;
            }
        });
        TransformExecutor.create(this.registry, Collections.emptyList(), this.evaluationContext, (InProcessPipelineRunner.CommittedBundle) null, this.created.getProducingTransformInternal(), this.completionCallback, this.transformEvaluationState).run();
        Assert.assertThat(Boolean.valueOf(atomicBoolean.get()), Matchers.is(true));
        Assert.assertThat(this.completionCallback.handledResult, Matchers.equalTo(build));
        Assert.assertThat(this.completionCallback.handledThrowable, Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void inputBundleProcessesEachElementFinishesAndCompletes() throws Exception {
        final StepTransformResult build = StepTransformResult.withoutHold(this.downstream.getProducingTransformInternal()).build();
        final ArrayList arrayList = new ArrayList();
        TransformEvaluator<String> transformEvaluator = new TransformEvaluator<String>() { // from class: org.apache.beam.runners.direct.TransformExecutorTest.2
            public void processElement(WindowedValue<String> windowedValue) throws Exception {
                arrayList.add(windowedValue);
            }

            public InProcessTransformResult finishBundle() throws Exception {
                return build;
            }
        };
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow("foo");
        WindowedValue valueInGlobalWindow2 = WindowedValue.valueInGlobalWindow("spam");
        WindowedValue valueInGlobalWindow3 = WindowedValue.valueInGlobalWindow("third");
        InProcessPipelineRunner.CommittedBundle commit = this.bundleFactory.createRootBundle(this.created).add(valueInGlobalWindow).add(valueInGlobalWindow2).add(valueInGlobalWindow3).commit(Instant.now());
        Mockito.when(this.registry.forApplication(this.downstream.getProducingTransformInternal(), commit, this.evaluationContext)).thenReturn(transformEvaluator);
        Executors.newSingleThreadExecutor().submit(TransformExecutor.create(this.registry, Collections.emptyList(), this.evaluationContext, commit, this.downstream.getProducingTransformInternal(), this.completionCallback, this.transformEvaluationState));
        this.evaluatorCompleted.await();
        Assert.assertThat(arrayList, Matchers.containsInAnyOrder(new WindowedValue[]{valueInGlobalWindow2, valueInGlobalWindow3, valueInGlobalWindow}));
        Assert.assertThat(this.completionCallback.handledResult, Matchers.equalTo(build));
        Assert.assertThat(this.completionCallback.handledThrowable, Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void processElementThrowsExceptionCallsback() throws Exception {
        final StepTransformResult build = StepTransformResult.withoutHold(this.downstream.getProducingTransformInternal()).build();
        final Exception exc = new Exception();
        TransformEvaluator<String> transformEvaluator = new TransformEvaluator<String>() { // from class: org.apache.beam.runners.direct.TransformExecutorTest.3
            public void processElement(WindowedValue<String> windowedValue) throws Exception {
                throw exc;
            }

            public InProcessTransformResult finishBundle() throws Exception {
                return build;
            }
        };
        InProcessPipelineRunner.CommittedBundle commit = this.bundleFactory.createRootBundle(this.created).add(WindowedValue.valueInGlobalWindow("foo")).commit(Instant.now());
        Mockito.when(this.registry.forApplication(this.downstream.getProducingTransformInternal(), commit, this.evaluationContext)).thenReturn(transformEvaluator);
        Executors.newSingleThreadExecutor().submit(TransformExecutor.create(this.registry, Collections.emptyList(), this.evaluationContext, commit, this.downstream.getProducingTransformInternal(), this.completionCallback, this.transformEvaluationState));
        this.evaluatorCompleted.await();
        Assert.assertThat(this.completionCallback.handledResult, Matchers.is(Matchers.nullValue()));
        Assert.assertThat(this.completionCallback.handledThrowable, Matchers.equalTo(exc));
    }

    @Test
    public void finishBundleThrowsExceptionCallsback() throws Exception {
        final Exception exc = new Exception();
        TransformEvaluator<String> transformEvaluator = new TransformEvaluator<String>() { // from class: org.apache.beam.runners.direct.TransformExecutorTest.4
            public void processElement(WindowedValue<String> windowedValue) throws Exception {
            }

            public InProcessTransformResult finishBundle() throws Exception {
                throw exc;
            }
        };
        InProcessPipelineRunner.CommittedBundle commit = this.bundleFactory.createRootBundle(this.created).commit(Instant.now());
        Mockito.when(this.registry.forApplication(this.downstream.getProducingTransformInternal(), commit, this.evaluationContext)).thenReturn(transformEvaluator);
        Executors.newSingleThreadExecutor().submit(TransformExecutor.create(this.registry, Collections.emptyList(), this.evaluationContext, commit, this.downstream.getProducingTransformInternal(), this.completionCallback, this.transformEvaluationState));
        this.evaluatorCompleted.await();
        Assert.assertThat(this.completionCallback.handledResult, Matchers.is(Matchers.nullValue()));
        Assert.assertThat(this.completionCallback.handledThrowable, Matchers.equalTo(exc));
    }

    @Test
    public void duringCallGetThreadIsNonNull() throws Exception {
        final StepTransformResult build = StepTransformResult.withoutHold(this.downstream.getProducingTransformInternal()).build();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Mockito.when(this.registry.forApplication(this.created.getProducingTransformInternal(), (InProcessPipelineRunner.CommittedBundle) null, this.evaluationContext)).thenReturn(new TransformEvaluator<Object>() { // from class: org.apache.beam.runners.direct.TransformExecutorTest.5
            public void processElement(WindowedValue<Object> windowedValue) throws Exception {
                throw new IllegalArgumentException("Shouldn't be called");
            }

            public InProcessTransformResult finishBundle() throws Exception {
                countDownLatch.countDown();
                countDownLatch2.await();
                return build;
            }
        });
        Runnable create = TransformExecutor.create(this.registry, Collections.emptyList(), this.evaluationContext, (InProcessPipelineRunner.CommittedBundle) null, this.created.getProducingTransformInternal(), this.completionCallback, this.transformEvaluationState);
        Executors.newSingleThreadExecutor().submit(create);
        countDownLatch.await();
        Assert.assertThat(create.getThread(), Matchers.not(Matchers.nullValue()));
        countDownLatch2.countDown();
    }

    @Test
    public void callWithEnforcementAppliesEnforcement() throws Exception {
        final InProcessTransformResult build = StepTransformResult.withoutHold(this.downstream.getProducingTransformInternal()).build();
        TransformEvaluator<Object> transformEvaluator = new TransformEvaluator<Object>() { // from class: org.apache.beam.runners.direct.TransformExecutorTest.6
            public void processElement(WindowedValue<Object> windowedValue) throws Exception {
            }

            public InProcessTransformResult finishBundle() throws Exception {
                return build;
            }
        };
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow("foo");
        WindowedValue valueInGlobalWindow2 = WindowedValue.valueInGlobalWindow("bar");
        InProcessPipelineRunner.CommittedBundle commit = this.bundleFactory.createRootBundle(this.created).add(valueInGlobalWindow).add(valueInGlobalWindow2).commit(Instant.now());
        Mockito.when(this.registry.forApplication(this.downstream.getProducingTransformInternal(), commit, this.evaluationContext)).thenReturn(transformEvaluator);
        TestEnforcementFactory testEnforcementFactory = new TestEnforcementFactory();
        TransformExecutor.create(this.registry, Collections.singleton(testEnforcementFactory), this.evaluationContext, commit, this.downstream.getProducingTransformInternal(), this.completionCallback, this.transformEvaluationState).run();
        TestEnforcement testEnforcement = testEnforcementFactory.instance;
        Assert.assertThat(testEnforcement.beforeElements, Matchers.containsInAnyOrder(new WindowedValue[]{valueInGlobalWindow2, valueInGlobalWindow}));
        Assert.assertThat(testEnforcement.afterElements, Matchers.containsInAnyOrder(new WindowedValue[]{valueInGlobalWindow2, valueInGlobalWindow}));
        Assert.assertThat(testEnforcement.finishedBundles, Matchers.contains(new InProcessTransformResult[]{build}));
    }

    @Test
    public void callWithEnforcementThrowsOnFinishPropagates() throws Exception {
        PCollection apply = this.created.apply(new PTransform<PCollection<String>, PCollection<byte[]>>() { // from class: org.apache.beam.runners.direct.TransformExecutorTest.7
            public PCollection<byte[]> apply(PCollection<String> pCollection) {
                return PCollection.createPrimitiveOutputInternal(pCollection.getPipeline(), pCollection.getWindowingStrategy(), pCollection.isBounded()).setCoder(ByteArrayCoder.of());
            }
        });
        final StepTransformResult build = StepTransformResult.withoutHold(apply.getProducingTransformInternal()).build();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        TransformEvaluator<Object> transformEvaluator = new TransformEvaluator<Object>() { // from class: org.apache.beam.runners.direct.TransformExecutorTest.8
            public void processElement(WindowedValue<Object> windowedValue) throws Exception {
            }

            public InProcessTransformResult finishBundle() throws Exception {
                countDownLatch.countDown();
                countDownLatch2.await();
                return build;
            }
        };
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow("foo".getBytes());
        InProcessPipelineRunner.CommittedBundle commit = this.bundleFactory.createRootBundle(apply).add(valueInGlobalWindow).commit(Instant.now());
        Mockito.when(this.registry.forApplication(apply.getProducingTransformInternal(), commit, this.evaluationContext)).thenReturn(transformEvaluator);
        Future<?> submit = Executors.newSingleThreadExecutor().submit(TransformExecutor.create(this.registry, Collections.singleton(ImmutabilityEnforcementFactory.create()), this.evaluationContext, commit, apply.getProducingTransformInternal(), this.completionCallback, this.transformEvaluationState));
        countDownLatch.await();
        ((byte[]) valueInGlobalWindow.getValue())[0] = 98;
        countDownLatch2.countDown();
        this.thrown.expectCause(Matchers.isA(IllegalMutationException.class));
        submit.get();
    }

    @Test
    public void callWithEnforcementThrowsOnElementPropagates() throws Exception {
        PCollection apply = this.created.apply(new PTransform<PCollection<String>, PCollection<byte[]>>() { // from class: org.apache.beam.runners.direct.TransformExecutorTest.9
            public PCollection<byte[]> apply(PCollection<String> pCollection) {
                return PCollection.createPrimitiveOutputInternal(pCollection.getPipeline(), pCollection.getWindowingStrategy(), pCollection.isBounded()).setCoder(ByteArrayCoder.of());
            }
        });
        final StepTransformResult build = StepTransformResult.withoutHold(apply.getProducingTransformInternal()).build();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        TransformEvaluator<Object> transformEvaluator = new TransformEvaluator<Object>() { // from class: org.apache.beam.runners.direct.TransformExecutorTest.10
            public void processElement(WindowedValue<Object> windowedValue) throws Exception {
                countDownLatch.countDown();
                countDownLatch2.await();
            }

            public InProcessTransformResult finishBundle() throws Exception {
                return build;
            }
        };
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow("foo".getBytes());
        InProcessPipelineRunner.CommittedBundle commit = this.bundleFactory.createRootBundle(apply).add(valueInGlobalWindow).commit(Instant.now());
        Mockito.when(this.registry.forApplication(apply.getProducingTransformInternal(), commit, this.evaluationContext)).thenReturn(transformEvaluator);
        Future<?> submit = Executors.newSingleThreadExecutor().submit(TransformExecutor.create(this.registry, Collections.singleton(ImmutabilityEnforcementFactory.create()), this.evaluationContext, commit, apply.getProducingTransformInternal(), this.completionCallback, this.transformEvaluationState));
        countDownLatch.await();
        ((byte[]) valueInGlobalWindow.getValue())[0] = 98;
        countDownLatch2.countDown();
        this.thrown.expectCause(Matchers.isA(IllegalMutationException.class));
        submit.get();
    }
}
