package org.apache.beam.sdk.transforms;

import java.util.List;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/transforms/DoFnTesterTest.class */
public class DoFnTesterTest {

    /* loaded from: input_file:org/apache/beam/sdk/transforms/DoFnTesterTest$CounterDoFn.class */
    private static class CounterDoFn extends DoFn<Long, String> {
        Aggregator<Long, Long> agg;
        private final long startBundleVal;
        private final long finishBundleVal;
        private boolean startBundleCalled;
        private boolean finishBundleCalled;

        public CounterDoFn() {
            this(0L, 0L);
        }

        public CounterDoFn(long j, long j2) {
            this.agg = createAggregator("ctr", new Sum.SumLongFn());
            this.startBundleVal = j;
            this.finishBundleVal = j2;
        }

        public void startBundle(DoFn<Long, String>.Context context) {
            this.agg.addValue(Long.valueOf(this.startBundleVal));
            this.startBundleCalled = true;
        }

        public void processElement(DoFn<Long, String>.ProcessContext processContext) throws Exception {
            this.agg.addValue(processContext.element());
            processContext.outputWithTimestamp(((Long) processContext.element()).toString(), new Instant(1000 * ((Long) processContext.element()).longValue()));
        }

        public void finishBundle(DoFn<Long, String>.Context context) {
            this.agg.addValue(Long.valueOf(this.finishBundleVal));
            this.finishBundleCalled = true;
        }

        boolean wasStartBundleCalled() {
            return this.startBundleCalled;
        }

        boolean wasFinishBundleCalled() {
            return this.finishBundleCalled;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/DoFnTesterTest$SideInputDoFn.class */
    private static class SideInputDoFn extends DoFn<Integer, Integer> {
        private final PCollectionView<Integer> value;

        private SideInputDoFn(PCollectionView<Integer> pCollectionView) {
            this.value = pCollectionView;
        }

        public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) throws Exception {
            processContext.output(processContext.sideInput(this.value));
        }
    }

    @Test
    public void processElement() throws Exception {
        DoFnTester of = DoFnTester.of(new CounterDoFn());
        of.processElement(1L);
        Assert.assertThat(of.takeOutputElements(), Matchers.hasItems(new String[]{"1"}));
        Assert.assertTrue(of.takeOutputElements().isEmpty());
        Assert.assertTrue(of.peekOutputElements().isEmpty());
        CounterDoFn counterDoFn = (CounterDoFn) of.fn;
        Assert.assertTrue(counterDoFn.wasStartBundleCalled());
        Assert.assertFalse(counterDoFn.wasFinishBundleCalled());
    }

    @Test
    public void processElementsWithPeeks() throws Exception {
        DoFnTester of = DoFnTester.of(new CounterDoFn());
        of.startBundle();
        CounterDoFn counterDoFn = (CounterDoFn) of.fn;
        Assert.assertTrue(counterDoFn.wasStartBundleCalled());
        Assert.assertFalse(counterDoFn.wasFinishBundleCalled());
        of.processElement(1L);
        of.processElement(2L);
        Assert.assertThat(of.peekOutputElements(), Matchers.hasItems(new String[]{"1", "2"}));
        of.processElement(3L);
        of.processElement(4L);
        Assert.assertThat(of.peekOutputElements(), Matchers.hasItems(new String[]{"1", "2", "3", "4"}));
        Assert.assertThat(of.takeOutputElements(), Matchers.hasItems(new String[]{"1", "2", "3", "4"}));
        Assert.assertTrue(of.peekOutputElements().isEmpty());
        Assert.assertTrue(of.takeOutputElements().isEmpty());
        Assert.assertTrue(counterDoFn.wasStartBundleCalled());
        Assert.assertFalse(counterDoFn.wasFinishBundleCalled());
        of.processElement(5L);
        of.processElement(6L);
        Assert.assertThat(of.peekOutputElements(), Matchers.hasItems(new String[]{"5", "6"}));
        Assert.assertThat(of.takeOutputElements(), Matchers.hasItems(new String[]{"5", "6"}));
        of.finishBundle();
        Assert.assertTrue(counterDoFn.wasStartBundleCalled());
        Assert.assertTrue(counterDoFn.wasFinishBundleCalled());
    }

    @Test
    public void processBatch() throws Exception {
        DoFnTester of = DoFnTester.of(new CounterDoFn());
        Assert.assertThat(of.processBundle(new Long[]{1L, 2L, 3L, 4L}), Matchers.hasItems(new String[]{"1", "2", "3", "4"}));
        Assert.assertTrue(of.peekOutputElements().isEmpty());
        CounterDoFn counterDoFn = (CounterDoFn) of.fn;
        Assert.assertTrue(counterDoFn.wasStartBundleCalled());
        Assert.assertTrue(counterDoFn.wasFinishBundleCalled());
    }

    @Test
    public void processElementWithTimestamp() throws Exception {
        DoFnTester of = DoFnTester.of(new CounterDoFn());
        of.processElement(1L);
        of.processElement(2L);
        List peekOutputElementsWithTimestamp = of.peekOutputElementsWithTimestamp();
        TimestampedValue of2 = TimestampedValue.of("1", new Instant(1000L));
        TimestampedValue of3 = TimestampedValue.of("2", new Instant(2000L));
        Assert.assertThat(peekOutputElementsWithTimestamp, Matchers.hasItems(new TimestampedValue[]{of2, of3}));
        of.processElement(3L);
        of.processElement(4L);
        TimestampedValue of4 = TimestampedValue.of("3", new Instant(3000L));
        TimestampedValue of5 = TimestampedValue.of("4", new Instant(4000L));
        Assert.assertThat(of.peekOutputElementsWithTimestamp(), Matchers.hasItems(new TimestampedValue[]{of2, of3, of4, of5}));
        Assert.assertThat(of.takeOutputElementsWithTimestamp(), Matchers.hasItems(new TimestampedValue[]{of2, of3, of4, of5}));
        Assert.assertTrue(of.takeOutputElementsWithTimestamp().isEmpty());
        Assert.assertTrue(of.peekOutputElementsWithTimestamp().isEmpty());
        Assert.assertTrue(of.peekOutputElements().isEmpty());
        Assert.assertTrue(of.takeOutputElements().isEmpty());
    }

    @Test
    public void getAggregatorValuesShouldGetValueOfCounter() throws Exception {
        CounterDoFn counterDoFn = new CounterDoFn();
        DoFnTester of = DoFnTester.of(counterDoFn);
        of.processBundle(new Long[]{1L, 2L, 4L, 8L});
        Assert.assertThat((Long) of.getAggregatorValue(counterDoFn.agg), Matchers.equalTo(15L));
    }

    @Test
    public void getAggregatorValuesWithEmptyCounterShouldSucceed() throws Exception {
        CounterDoFn counterDoFn = new CounterDoFn();
        DoFnTester of = DoFnTester.of(counterDoFn);
        of.processBundle(new Long[0]);
        Assert.assertThat((Long) of.getAggregatorValue(counterDoFn.agg), Matchers.equalTo(0L));
    }

    @Test
    public void getAggregatorValuesInStartFinishBundleShouldGetValues() throws Exception {
        CounterDoFn counterDoFn = new CounterDoFn(1L, 2L);
        DoFnTester of = DoFnTester.of(counterDoFn);
        of.processBundle(new Long[]{0L, 0L});
        Assert.assertThat((Long) of.getAggregatorValue(counterDoFn.agg), Matchers.equalTo(3L));
    }

    @Test
    public void peekValuesInWindow() throws Exception {
        DoFnTester of = DoFnTester.of(new CounterDoFn(1L, 2L));
        of.startBundle();
        of.processElement(1L);
        of.processElement(2L);
        of.finishBundle();
        Assert.assertThat(of.peekOutputElementsInWindow(GlobalWindow.INSTANCE), Matchers.containsInAnyOrder(new TimestampedValue[]{TimestampedValue.of("1", new Instant(1000L)), TimestampedValue.of("2", new Instant(2000L))}));
        Assert.assertThat(of.peekOutputElementsInWindow(new IntervalWindow(new Instant(0L), new Instant(10L))), Matchers.emptyIterable());
    }

    @Test
    public void fnWithSideInputDefault() throws Exception {
        DoFnTester of = DoFnTester.of(new SideInputDoFn(PCollectionViews.singletonView(TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of())));
        of.processElement(1);
        of.processElement(2);
        of.processElement(4);
        of.processElement(8);
        Assert.assertThat(of.peekOutputElements(), Matchers.containsInAnyOrder(new Integer[]{0, 0, 0, 0}));
    }

    @Test
    public void fnWithSideInputExplicit() throws Exception {
        PCollectionView singletonView = PCollectionViews.singletonView(TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
        DoFnTester of = DoFnTester.of(new SideInputDoFn(singletonView));
        of.setSideInput(singletonView, GlobalWindow.INSTANCE, -2);
        of.processElement(16);
        of.processElement(32);
        of.processElement(64);
        of.processElement(128);
        of.finishBundle();
        Assert.assertThat(of.peekOutputElements(), Matchers.containsInAnyOrder(new Integer[]{-2, -2, -2, -2}));
    }
}
