package org.apache.beam.runners.spark.translation;

import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.class */
public class SparkInputDataProcessorTest {

    @Rule
    public TestExecutors.TestExecutorService executor = TestExecutors.from(Executors.newCachedThreadPool());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest$TestDoFnRunner.class */
    public static class TestDoFnRunner implements DoFnRunner<String, String> {
        private final DoFnRunners.OutputManager output;
        private final AtomicInteger producedCount;
        private final int desiredCount;
        private final TestDoFn fn = new TestDoFn();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest$TestDoFnRunner$TestDoFn.class */
        public class TestDoFn extends DoFn<String, String> {
            TestDoFn() {
            }

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element String str) {
                for (int i = 0; i < TestDoFnRunner.this.desiredCount; i++) {
                    TestDoFnRunner.this.output.output(new TupleTag("key"), WindowedValue.valueInGlobalWindow(str + "_" + i));
                    TestDoFnRunner.this.producedCount.incrementAndGet();
                }
            }
        }

        TestDoFnRunner(DoFnRunners.OutputManager outputManager, int i, AtomicInteger atomicInteger) {
            this.output = outputManager;
            this.producedCount = atomicInteger;
            this.desiredCount = i;
        }

        public void startBundle() {
        }

        public void processElement(WindowedValue<String> windowedValue) {
            this.fn.processElement((String) windowedValue.getValue());
        }

        public <KeyT> void onTimer(String str, String str2, KeyT keyt, BoundedWindow boundedWindow, Instant instant, Instant instant2, TimeDomain timeDomain) {
        }

        public void finishBundle() {
        }

        public <KeyT> void onWindowExpiration(BoundedWindow boundedWindow, Instant instant, KeyT keyt) {
        }

        public DoFn<String, String> getFn() {
            return this.fn;
        }
    }

    @Test
    public void testBoundedProcessWorksWithEmptyInput() {
        SparkInputDataProcessor createBounded = SparkInputDataProcessor.createBounded();
        Assert.assertFalse(createBounded.createOutputIterator(Collections.emptyIterator(), setUpCtx(createBounded.getOutputManager(), 1, new AtomicInteger())).hasNext());
        Assert.assertEquals(0L, Iterators.size(r0));
        Assert.assertEquals(0L, r0.get());
    }

    @Test
    public void testUnboundedProcessWorksWithEmptyInput() {
        SparkInputDataProcessor createUnbounded = SparkInputDataProcessor.createUnbounded();
        Assert.assertFalse(createUnbounded.createOutputIterator(Collections.emptyIterator(), setUpCtx(createUnbounded.getOutputManager(), 1, new AtomicInteger())).hasNext());
        Assert.assertEquals(0L, Iterators.size(r0));
        Assert.assertEquals(0L, r0.get());
    }

    @Test
    public void testBoundedProcessBlocksOnMaxInputsUntilTheyAreConsumed() throws Exception {
        SparkInputDataProcessor createBounded = SparkInputDataProcessor.createBounded();
        AtomicInteger atomicInteger = new AtomicInteger();
        Iterator createOutputIterator = createBounded.createOutputIterator(Lists.newArrayList(new WindowedValue[]{WindowedValue.valueInGlobalWindow("tick")}).iterator(), setUpCtx(createBounded.getOutputManager(), 1000, atomicInteger));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        int i = 501;
        this.executor.submit(() -> {
            while (atomicInteger.get() != i) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    Thread.interrupted();
                }
            }
            countDownLatch.countDown();
        });
        Assert.assertTrue(createOutputIterator.hasNext());
        if (!countDownLatch.await(10L, TimeUnit.SECONDS)) {
            throw new IllegalStateException("Did not reach expected max writes cap while not consuming");
        }
        Assert.assertEquals(501, atomicInteger.get());
        Assert.assertEquals(1000, Iterators.size(createOutputIterator));
        Assert.assertEquals(1000, atomicInteger.get());
    }

    @Test
    public void testUnboundedProcessWritesAllInputsAndNotBlock() {
        SparkInputDataProcessor createUnbounded = SparkInputDataProcessor.createUnbounded();
        Assert.assertTrue(createUnbounded.createOutputIterator(Lists.newArrayList(new WindowedValue[]{WindowedValue.valueInGlobalWindow("tick")}).iterator(), setUpCtx(createUnbounded.getOutputManager(), 1000, new AtomicInteger())).hasNext());
        Assert.assertEquals(1000, r0.get());
        Assert.assertEquals(1000, Iterators.size(r0));
    }

    @Test
    public void testBoundedProcessLifecycle() {
        SparkProcessContext<String, String, String> upCtx = setUpCtx(SparkInputDataProcessor.createBounded().getOutputManager(), 1000, new AtomicInteger());
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow("tick");
        Assert.assertEquals(1000, Iterators.size(r0.createOutputIterator(Lists.newArrayList(new WindowedValue[]{valueInGlobalWindow}).iterator(), upCtx)));
        DoFnRunner doFnRunner = upCtx.getDoFnRunner();
        ((DoFnRunner) Mockito.verify(doFnRunner)).startBundle();
        ((DoFnRunner) Mockito.verify(doFnRunner)).processElement(valueInGlobalWindow);
        ((DoFnRunner) Mockito.verify(doFnRunner)).finishBundle();
        Mockito.verifyNoMoreInteractions(new Object[]{doFnRunner});
    }

    @Test
    public void testUnboundedProcessLifecycle() {
        SparkProcessContext<String, String, String> upCtx = setUpCtx(SparkInputDataProcessor.createUnbounded().getOutputManager(), 1000, new AtomicInteger());
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow("tick");
        Assert.assertEquals(1000, Iterators.size(r0.createOutputIterator(Lists.newArrayList(new WindowedValue[]{valueInGlobalWindow}).iterator(), upCtx)));
        DoFnRunner doFnRunner = upCtx.getDoFnRunner();
        ((DoFnRunner) Mockito.verify(doFnRunner)).startBundle();
        ((DoFnRunner) Mockito.verify(doFnRunner)).processElement(valueInGlobalWindow);
        ((DoFnRunner) Mockito.verify(doFnRunner)).finishBundle();
        Mockito.verifyNoMoreInteractions(new Object[]{doFnRunner});
    }

    private SparkProcessContext<String, String, String> setUpCtx(DoFnRunners.OutputManager outputManager, int i, AtomicInteger atomicInteger) {
        SparkProcessContext<String, String, String> sparkProcessContext = (SparkProcessContext) Mockito.mock(SparkProcessContext.class);
        TestDoFnRunner testDoFnRunner = new TestDoFnRunner(outputManager, i, atomicInteger);
        DoFnRunner doFnRunner = (DoFnRunner) Mockito.spy(testDoFnRunner);
        DoFn doFn = (DoFn) Mockito.spy(testDoFnRunner.getFn());
        Mockito.when(sparkProcessContext.getDoFnRunner()).thenReturn(doFnRunner);
        Mockito.when(sparkProcessContext.getDoFn()).thenReturn(doFn);
        Mockito.when(sparkProcessContext.getTimerDataIterator()).thenReturn(Collections.emptyIterator());
        return sparkProcessContext;
    }
}
