package org.apache.beam.sdk.transforms;

import com.fasterxml.jackson.annotation.JsonCreator;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnWithContext;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.StringUtils;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest.class */
public class ParDoTest implements Serializable {

    @Rule
    public transient ExpectedException thrown = ExpectedException.none();

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$Checker.class */
    private static class Checker implements SerializableFunction<Iterable<String>, Void> {
        private Checker() {
        }

        public Void apply(Iterable<String> iterable) {
            boolean z = false;
            boolean z2 = false;
            boolean z3 = false;
            for (String str : iterable) {
                if (str.equals("elem:1:1")) {
                    if (z2) {
                        throw new AssertionError("Received duplicate element");
                    }
                    z2 = true;
                } else if (str.equals("start:2:2")) {
                    z = true;
                } else {
                    if (!str.equals("finish:3:3")) {
                        throw new AssertionError("Got unexpected value: " + str);
                    }
                    z3 = true;
                }
            }
            if (!z) {
                throw new AssertionError("Missing \"start:2:2\"");
            }
            if (!z2) {
                throw new AssertionError("Missing \"elem:1:1\"");
            }
            if (z3) {
                return null;
            }
            throw new AssertionError("Missing \"finish:3:3\"");
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$HasExpectedOutput.class */
    static class HasExpectedOutput implements SerializableFunction<Iterable<String>, Void>, Serializable {
        private final List<Integer> inputs;
        private final List<Integer> sideInputs;
        private final String sideOutput;
        private final boolean ordered;

        public static HasExpectedOutput forInput(List<Integer> list) {
            return new HasExpectedOutput(new ArrayList(list), new ArrayList(), "", false);
        }

        private HasExpectedOutput(List<Integer> list, List<Integer> list2, String str, boolean z) {
            this.inputs = list;
            this.sideInputs = list2;
            this.sideOutput = str;
            this.ordered = z;
        }

        public HasExpectedOutput andSideInputs(Integer... numArr) {
            ArrayList arrayList = new ArrayList();
            for (Integer num : numArr) {
                arrayList.add(num);
            }
            return new HasExpectedOutput(this.inputs, arrayList, this.sideOutput, this.ordered);
        }

        public HasExpectedOutput fromSideOutput(TupleTag<String> tupleTag) {
            return fromSideOutput(tupleTag.getId());
        }

        public HasExpectedOutput fromSideOutput(String str) {
            return new HasExpectedOutput(this.inputs, this.sideInputs, str, this.ordered);
        }

        public HasExpectedOutput inOrder() {
            return new HasExpectedOutput(this.inputs, this.sideInputs, this.sideOutput, true);
        }

        public Void apply(Iterable<String> iterable) {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            for (String str : iterable) {
                if (str.contains("started")) {
                    arrayList.add(str);
                } else if (str.contains("finished")) {
                    arrayList3.add(str);
                } else {
                    arrayList2.add(str);
                }
            }
            String str2 = this.sideInputs.isEmpty() ? "" : ": " + this.sideInputs;
            String str3 = this.sideOutput.isEmpty() ? "" : this.sideOutput + ": ";
            ArrayList arrayList4 = new ArrayList();
            Iterator<Integer> it = this.inputs.iterator();
            while (it.hasNext()) {
                arrayList4.add(str3 + "processing: " + it.next() + str2);
            }
            String[] strArr = (String[]) arrayList4.toArray(new String[arrayList4.size()]);
            if (!this.ordered || arrayList4.isEmpty()) {
                Assert.assertThat(arrayList2, IsIterableContainingInAnyOrder.containsInAnyOrder(strArr));
            } else {
                Assert.assertThat(arrayList2, IsIterableContainingInOrder.contains(strArr));
            }
            Assert.assertEquals(arrayList.size(), arrayList3.size());
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                Assert.assertEquals(str3 + "started", (String) it2.next());
            }
            Iterator it3 = arrayList3.iterator();
            while (it3.hasNext()) {
                Assert.assertEquals(str3 + "finished", (String) it3.next());
            }
            return null;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$MainOutputDummyFn.class */
    private static class MainOutputDummyFn extends DoFn<Integer, TestDummy> {
        private TupleTag<Integer> sideTag;

        public MainOutputDummyFn(TupleTag<Integer> tupleTag) {
            this.sideTag = tupleTag;
        }

        public void processElement(DoFn<Integer, TestDummy>.ProcessContext processContext) {
            processContext.output(new TestDummy());
            processContext.sideOutput(this.sideTag, 1);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$MultiFilter.class */
    static class MultiFilter extends PTransform<PCollection<Integer>, PCollectionTuple> {
        private static final TupleTag<Integer> BY2 = new TupleTag<Integer>("by2") { // from class: org.apache.beam.sdk.transforms.ParDoTest.MultiFilter.1
        };
        private static final TupleTag<Integer> BY3 = new TupleTag<Integer>("by3") { // from class: org.apache.beam.sdk.transforms.ParDoTest.MultiFilter.2
        };

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$MultiFilter$FilterFn.class */
        public static class FilterFn extends DoFn<Integer, Integer> {
            private final int divisor;

            FilterFn(int i) {
                this.divisor = i;
            }

            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) throws Exception {
                if (((Integer) processContext.element()).intValue() % this.divisor == 0) {
                    processContext.output(processContext.element());
                }
            }
        }

        MultiFilter() {
        }

        public PCollectionTuple apply(PCollection<Integer> pCollection) {
            PCollection apply = pCollection.apply("Filter2s", ParDo.of(new FilterFn(2)));
            return PCollectionTuple.of(BY2, apply).and(BY3, pCollection.apply("Filter3s", ParDo.of(new FilterFn(3))));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$PrintingDoFn.class */
    private static class PrintingDoFn extends DoFn<String, String> implements DoFn.RequiresWindowAccess {
        private PrintingDoFn() {
        }

        public void processElement(DoFn<String, String>.ProcessContext processContext) {
            processContext.output(((String) processContext.element()) + ":" + processContext.timestamp().getMillis() + ":" + processContext.window().maxTimestamp().getMillis());
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$SideOutputDummyFn.class */
    private static class SideOutputDummyFn extends DoFn<Integer, Integer> {
        private TupleTag<TestDummy> sideTag;

        public SideOutputDummyFn(TupleTag<TestDummy> tupleTag) {
            this.sideTag = tupleTag;
        }

        public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
            processContext.output(1);
            processContext.sideOutput(this.sideTag, new TestDummy());
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$StrangelyNamedDoer.class */
    private static class StrangelyNamedDoer extends DoFn<Integer, String> {
        private StrangelyNamedDoer() {
        }

        public void processElement(DoFn<Integer, String>.ProcessContext processContext) {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$TestDoFn.class */
    public static class TestDoFn extends DoFn<Integer, String> {
        State state = State.UNSTARTED;
        final List<PCollectionView<Integer>> sideInputViews = new ArrayList();
        final List<TupleTag<String>> sideOutputTupleTags = new ArrayList();

        /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$TestDoFn$State.class */
        enum State {
            UNSTARTED,
            STARTED,
            PROCESSING,
            FINISHED
        }

        public TestDoFn() {
        }

        public TestDoFn(List<PCollectionView<Integer>> list, List<TupleTag<String>> list2) {
            this.sideInputViews.addAll(list);
            this.sideOutputTupleTags.addAll(list2);
        }

        public void startBundle(DoFn<Integer, String>.Context context) {
            Assert.assertThat(this.state, Matchers.anyOf(Matchers.equalTo(State.UNSTARTED), Matchers.equalTo(State.FINISHED)));
            this.state = State.STARTED;
            outputToAll(context, "started");
        }

        public void processElement(DoFn<Integer, String>.ProcessContext processContext) {
            Assert.assertThat(this.state, Matchers.anyOf(Matchers.equalTo(State.STARTED), Matchers.equalTo(State.PROCESSING)));
            this.state = State.PROCESSING;
            outputToAllWithSideInputs(processContext, "processing: " + processContext.element());
        }

        public void finishBundle(DoFn<Integer, String>.Context context) {
            Assert.assertThat(this.state, Matchers.anyOf(Matchers.equalTo(State.STARTED), Matchers.equalTo(State.PROCESSING)));
            this.state = State.FINISHED;
            outputToAll(context, "finished");
        }

        private void outputToAll(DoFn<Integer, String>.Context context, String str) {
            context.output(str);
            for (TupleTag<String> tupleTag : this.sideOutputTupleTags) {
                context.sideOutput(tupleTag, tupleTag.getId() + ": " + str);
            }
        }

        private void outputToAllWithSideInputs(DoFn<Integer, String>.ProcessContext processContext, String str) {
            if (!this.sideInputViews.isEmpty()) {
                ArrayList arrayList = new ArrayList();
                Iterator<PCollectionView<Integer>> it = this.sideInputViews.iterator();
                while (it.hasNext()) {
                    arrayList.add(processContext.sideInput(it.next()));
                }
                str = str + ": " + arrayList;
            }
            processContext.output(str);
            for (TupleTag<String> tupleTag : this.sideOutputTupleTags) {
                processContext.sideOutput(tupleTag, tupleTag.getId() + ": " + str);
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$TestDoFnWithContext.class */
    static class TestDoFnWithContext extends DoFnWithContext<Integer, String> {
        State state = State.UNSTARTED;
        final List<PCollectionView<Integer>> sideInputViews = new ArrayList();
        final List<TupleTag<String>> sideOutputTupleTags = new ArrayList();

        /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$TestDoFnWithContext$State.class */
        enum State {
            UNSTARTED,
            STARTED,
            PROCESSING,
            FINISHED
        }

        public TestDoFnWithContext() {
        }

        public TestDoFnWithContext(List<PCollectionView<Integer>> list, List<TupleTag<String>> list2) {
            this.sideInputViews.addAll(list);
            this.sideOutputTupleTags.addAll(list2);
        }

        @DoFnWithContext.StartBundle
        public void startBundle(DoFnWithContext<Integer, String>.Context context) {
            Assert.assertEquals(State.UNSTARTED, this.state);
            this.state = State.STARTED;
            outputToAll(context, "started");
        }

        @DoFnWithContext.ProcessElement
        public void processElement(DoFnWithContext<Integer, String>.ProcessContext processContext) {
            Assert.assertThat(this.state, Matchers.anyOf(Matchers.equalTo(State.STARTED), Matchers.equalTo(State.PROCESSING)));
            this.state = State.PROCESSING;
            outputToAllWithSideInputs(processContext, "processing: " + processContext.element());
        }

        @DoFnWithContext.FinishBundle
        public void finishBundle(DoFnWithContext<Integer, String>.Context context) {
            Assert.assertThat(this.state, Matchers.anyOf(Matchers.equalTo(State.STARTED), Matchers.equalTo(State.PROCESSING)));
            this.state = State.FINISHED;
            outputToAll(context, "finished");
        }

        private void outputToAll(DoFnWithContext<Integer, String>.Context context, String str) {
            context.output(str);
            for (TupleTag<String> tupleTag : this.sideOutputTupleTags) {
                context.sideOutput(tupleTag, tupleTag.getId() + ": " + str);
            }
        }

        private void outputToAllWithSideInputs(DoFnWithContext<Integer, String>.ProcessContext processContext, String str) {
            if (!this.sideInputViews.isEmpty()) {
                ArrayList arrayList = new ArrayList();
                Iterator<PCollectionView<Integer>> it = this.sideInputViews.iterator();
                while (it.hasNext()) {
                    arrayList.add(processContext.sideInput(it.next()));
                }
                str = str + ": " + arrayList;
            }
            processContext.output(str);
            for (TupleTag<String> tupleTag : this.sideOutputTupleTags) {
                processContext.sideOutput(tupleTag, tupleTag.getId() + ": " + str);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$TestDummy.class */
    public static class TestDummy {
        private TestDummy() {
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$TestDummyCoder.class */
    private static class TestDummyCoder extends AtomicCoder<TestDummy> {
        private static final TestDummyCoder INSTANCE = new TestDummyCoder();

        private TestDummyCoder() {
        }

        @JsonCreator
        public static TestDummyCoder of() {
            return INSTANCE;
        }

        public static List<Object> getInstanceComponents(TestDummy testDummy) {
            return Collections.emptyList();
        }

        public void encode(TestDummy testDummy, OutputStream outputStream, Coder.Context context) throws CoderException, IOException {
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public TestDummy m122decode(InputStream inputStream, Coder.Context context) throws CoderException, IOException {
            return new TestDummy();
        }

        public boolean isRegisterByteSizeObserverCheap(TestDummy testDummy, Coder.Context context) {
            return true;
        }

        public void registerByteSizeObserver(TestDummy testDummy, ElementByteSizeObserver elementByteSizeObserver, Coder.Context context) throws Exception {
            elementByteSizeObserver.update(0L);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$TestFinishBatchErrorDoFn.class */
    static class TestFinishBatchErrorDoFn extends DoFn<Integer, String> {
        TestFinishBatchErrorDoFn() {
        }

        public void processElement(DoFn<Integer, String>.ProcessContext processContext) {
        }

        public void finishBundle(DoFn<Integer, String>.Context context) {
            throw new RuntimeException("test error in finalize");
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$TestFormatTimestampDoFn.class */
    static class TestFormatTimestampDoFn extends DoFn<Integer, String> {
        TestFormatTimestampDoFn() {
        }

        public void processElement(DoFn<Integer, String>.ProcessContext processContext) {
            Preconditions.checkNotNull(processContext.timestamp());
            processContext.output("processing: " + processContext.element() + ", timestamp: " + processContext.timestamp().getMillis());
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$TestNoOutputDoFn.class */
    static class TestNoOutputDoFn extends DoFn<Integer, String> {
        TestNoOutputDoFn() {
        }

        public void processElement(DoFn<Integer, String>.ProcessContext processContext) throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$TestOutputTimestampDoFn.class */
    static class TestOutputTimestampDoFn extends DoFn<Integer, Integer> {
        TestOutputTimestampDoFn() {
        }

        public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
            Integer num = (Integer) processContext.element();
            processContext.outputWithTimestamp(num, new Instant(num.longValue()));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$TestProcessElementErrorDoFn.class */
    static class TestProcessElementErrorDoFn extends DoFn<Integer, String> {
        TestProcessElementErrorDoFn() {
        }

        public void processElement(DoFn<Integer, String>.ProcessContext processContext) {
            throw new RuntimeException("test error in process");
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$TestShiftTimestampDoFn.class */
    static class TestShiftTimestampDoFn extends DoFn<Integer, Integer> {
        private Duration allowedTimestampSkew;
        private Duration durationToShift;

        public TestShiftTimestampDoFn(Duration duration, Duration duration2) {
            this.allowedTimestampSkew = duration;
            this.durationToShift = duration2;
        }

        public Duration getAllowedTimestampSkew() {
            return this.allowedTimestampSkew;
        }

        public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
            Instant timestamp = processContext.timestamp();
            Preconditions.checkNotNull(timestamp);
            processContext.outputWithTimestamp((Integer) processContext.element(), timestamp.plus(this.durationToShift));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ParDoTest$TestStartBatchErrorDoFn.class */
    static class TestStartBatchErrorDoFn extends DoFn<Integer, String> {
        TestStartBatchErrorDoFn() {
        }

        public void startBundle(DoFn<Integer, String>.Context context) {
            throw new RuntimeException("test error in initialize");
        }

        public void processElement(DoFn<Integer, String>.ProcessContext processContext) {
        }
    }

    @Test
    @Category({RunnableOnService.class})
    public void testParDo() {
        TestPipeline create = TestPipeline.create();
        List asList = Arrays.asList(3, -42, 666);
        PAssert.that(create.apply(Create.of(asList)).apply(ParDo.of(new TestDoFn()))).satisfies(HasExpectedOutput.forInput(asList));
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testParDo2() {
        TestPipeline create = TestPipeline.create();
        List asList = Arrays.asList(3, -42, 666);
        PAssert.that(create.apply(Create.of(asList)).apply(ParDo.of(new TestDoFnWithContext()))).satisfies(HasExpectedOutput.forInput(asList));
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testParDoEmpty() {
        TestPipeline create = TestPipeline.create();
        List asList = Arrays.asList(new Integer[0]);
        PAssert.that(create.apply(Create.of(asList).withCoder(VarIntCoder.of())).apply("TestDoFn", ParDo.of(new TestDoFn()))).satisfies(HasExpectedOutput.forInput(asList));
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testParDoEmptyOutputs() {
        TestPipeline create = TestPipeline.create();
        PAssert.that(create.apply(Create.of(Arrays.asList(new Integer[0])).withCoder(VarIntCoder.of())).apply("TestDoFn", ParDo.of(new TestNoOutputDoFn()))).empty();
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testParDoWithSideOutputs() {
        TestPipeline create = TestPipeline.create();
        List asList = Arrays.asList(3, -42, 666);
        TupleTag<String> tupleTag = new TupleTag<String>("main") { // from class: org.apache.beam.sdk.transforms.ParDoTest.1
        };
        TupleTag<String> tupleTag2 = new TupleTag<String>("side1") { // from class: org.apache.beam.sdk.transforms.ParDoTest.2
        };
        TupleTag<String> tupleTag3 = new TupleTag<String>("side2") { // from class: org.apache.beam.sdk.transforms.ParDoTest.3
        };
        TupleTag<String> tupleTag4 = new TupleTag<String>("side3") { // from class: org.apache.beam.sdk.transforms.ParDoTest.4
        };
        TupleTag<String> tupleTag5 = new TupleTag<String>("sideUnwritten") { // from class: org.apache.beam.sdk.transforms.ParDoTest.5
        };
        PCollectionTuple apply = create.apply(Create.of(asList)).apply(ParDo.of(new TestDoFn(Arrays.asList(new PCollectionView[0]), Arrays.asList(tupleTag2, tupleTag3, tupleTag4))).withOutputTags(tupleTag, TupleTagList.of(tupleTag4).and(tupleTag2).and(tupleTag5).and(tupleTag3)));
        PAssert.that(apply.get(tupleTag)).satisfies(HasExpectedOutput.forInput(asList));
        PAssert.that(apply.get(tupleTag2)).satisfies(HasExpectedOutput.forInput(asList).fromSideOutput(tupleTag2));
        PAssert.that(apply.get(tupleTag3)).satisfies(HasExpectedOutput.forInput(asList).fromSideOutput(tupleTag3));
        PAssert.that(apply.get(tupleTag4)).satisfies(HasExpectedOutput.forInput(asList).fromSideOutput(tupleTag4));
        PAssert.that(apply.get(tupleTag5)).empty();
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testParDoEmptyWithSideOutputs() {
        TestPipeline create = TestPipeline.create();
        List asList = Arrays.asList(new Integer[0]);
        TupleTag<String> tupleTag = new TupleTag<String>("main") { // from class: org.apache.beam.sdk.transforms.ParDoTest.6
        };
        TupleTag<String> tupleTag2 = new TupleTag<String>("side1") { // from class: org.apache.beam.sdk.transforms.ParDoTest.7
        };
        TupleTag<String> tupleTag3 = new TupleTag<String>("side2") { // from class: org.apache.beam.sdk.transforms.ParDoTest.8
        };
        TupleTag<String> tupleTag4 = new TupleTag<String>("side3") { // from class: org.apache.beam.sdk.transforms.ParDoTest.9
        };
        TupleTag<String> tupleTag5 = new TupleTag<String>("sideUnwritten") { // from class: org.apache.beam.sdk.transforms.ParDoTest.10
        };
        PCollectionTuple apply = create.apply(Create.of(asList)).apply(ParDo.of(new TestDoFn(Arrays.asList(new PCollectionView[0]), Arrays.asList(tupleTag2, tupleTag3, tupleTag4))).withOutputTags(tupleTag, TupleTagList.of(tupleTag4).and(tupleTag2).and(tupleTag5).and(tupleTag3)));
        PAssert.that(apply.get(tupleTag)).satisfies(HasExpectedOutput.forInput(asList));
        PAssert.that(apply.get(tupleTag2)).satisfies(HasExpectedOutput.forInput(asList).fromSideOutput(tupleTag2));
        PAssert.that(apply.get(tupleTag3)).satisfies(HasExpectedOutput.forInput(asList).fromSideOutput(tupleTag3));
        PAssert.that(apply.get(tupleTag4)).satisfies(HasExpectedOutput.forInput(asList).fromSideOutput(tupleTag4));
        PAssert.that(apply.get(tupleTag5)).empty();
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testParDoWithEmptySideOutputs() {
        TestPipeline create = TestPipeline.create();
        List asList = Arrays.asList(new Integer[0]);
        TupleTag<String> tupleTag = new TupleTag<String>("main") { // from class: org.apache.beam.sdk.transforms.ParDoTest.11
        };
        TupleTag<String> tupleTag2 = new TupleTag<String>("side1") { // from class: org.apache.beam.sdk.transforms.ParDoTest.12
        };
        TupleTag<String> tupleTag3 = new TupleTag<String>("side2") { // from class: org.apache.beam.sdk.transforms.ParDoTest.13
        };
        PCollectionTuple apply = create.apply(Create.of(asList)).apply(ParDo.of(new TestNoOutputDoFn()).withOutputTags(tupleTag, TupleTagList.of(tupleTag2).and(tupleTag3)));
        PAssert.that(apply.get(tupleTag)).empty();
        PAssert.that(apply.get(tupleTag2)).empty();
        PAssert.that(apply.get(tupleTag3)).empty();
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testParDoWithOnlySideOutputs() {
        TestPipeline create = TestPipeline.create();
        List asList = Arrays.asList(3, -42, 666);
        TupleTag<Void> tupleTag = new TupleTag<Void>("main") { // from class: org.apache.beam.sdk.transforms.ParDoTest.14
        };
        final TupleTag<Integer> tupleTag2 = new TupleTag<Integer>("side") { // from class: org.apache.beam.sdk.transforms.ParDoTest.15
        };
        PCollectionTuple apply = create.apply(Create.of(asList)).apply(ParDo.withOutputTags(tupleTag, TupleTagList.of(tupleTag2)).of(new DoFn<Integer, Void>() { // from class: org.apache.beam.sdk.transforms.ParDoTest.16
            public void processElement(DoFn<Integer, Void>.ProcessContext processContext) {
                processContext.sideOutput(tupleTag2, processContext.element());
            }
        }));
        PAssert.that(apply.get(tupleTag)).empty();
        PAssert.that(apply.get(tupleTag2)).containsInAnyOrder(asList);
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testParDoWritingToUndeclaredSideOutput() {
        TestPipeline create = TestPipeline.create();
        List asList = Arrays.asList(3, -42, 666);
        PAssert.that(create.apply(Create.of(asList)).apply(ParDo.of(new TestDoFn(Arrays.asList(new PCollectionView[0]), Arrays.asList(new TupleTag<String>("side") { // from class: org.apache.beam.sdk.transforms.ParDoTest.17
        }))))).satisfies(HasExpectedOutput.forInput(asList));
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testParDoUndeclaredSideOutputLimit() {
        TestPipeline create = TestPipeline.create();
        PCollection apply = create.apply(Create.of(Arrays.asList(3)));
        apply.apply("Success1000", ParDo.of(new DoFn<Integer, String>() { // from class: org.apache.beam.sdk.transforms.ParDoTest.18
            public void processElement(DoFn<Integer, String>.ProcessContext processContext) {
                TupleTag<String> tupleTag = new TupleTag<String>() { // from class: org.apache.beam.sdk.transforms.ParDoTest.18.1
                };
                processContext.sideOutput(tupleTag, "side");
                processContext.sideOutput(tupleTag, "side");
                processContext.sideOutput(tupleTag, "side");
                for (int i = 0; i < 998; i++) {
                    processContext.sideOutput(new TupleTag<String>() { // from class: org.apache.beam.sdk.transforms.ParDoTest.18.2
                    }, "side");
                }
            }
        }));
        create.run();
        apply.apply("Failure1001", ParDo.of(new DoFn<Integer, String>() { // from class: org.apache.beam.sdk.transforms.ParDoTest.19
            public void processElement(DoFn<Integer, String>.ProcessContext processContext) {
                for (int i = 0; i < 1000; i++) {
                    processContext.sideOutput(new TupleTag<String>() { // from class: org.apache.beam.sdk.transforms.ParDoTest.19.1
                    }, "side");
                }
            }
        }));
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("the number of side outputs has exceeded a limit");
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testParDoWithSideInputs() {
        TestPipeline create = TestPipeline.create();
        List asList = Arrays.asList(3, -42, 666);
        PCollectionView apply = create.apply("CreateSideInput1", Create.of(new Integer[]{11})).apply("ViewSideInput1", View.asSingleton());
        PCollectionView apply2 = create.apply("CreateSideInputUnread", Create.of(new Integer[]{-3333})).apply("ViewSideInputUnread", View.asSingleton());
        PCollectionView apply3 = create.apply("CreateSideInput2", Create.of(new Integer[]{222})).apply("ViewSideInput2", View.asSingleton());
        PAssert.that(create.apply(Create.of(asList)).apply(ParDo.withSideInputs(new PCollectionView[]{apply, apply2, apply3}).of(new TestDoFn(Arrays.asList(apply, apply3), Arrays.asList(new TupleTag[0]))))).satisfies(HasExpectedOutput.forInput(asList).andSideInputs(11, 222));
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testParDoWithSideInputsIsCumulative() {
        TestPipeline create = TestPipeline.create();
        List asList = Arrays.asList(3, -42, 666);
        PCollectionView apply = create.apply("CreateSideInput1", Create.of(new Integer[]{11})).apply("ViewSideInput1", View.asSingleton());
        PCollectionView apply2 = create.apply("CreateSideInputUnread", Create.of(new Integer[]{-3333})).apply("ViewSideInputUnread", View.asSingleton());
        PCollectionView apply3 = create.apply("CreateSideInput2", Create.of(new Integer[]{222})).apply("ViewSideInput2", View.asSingleton());
        PAssert.that(create.apply(Create.of(asList)).apply(ParDo.withSideInputs(new PCollectionView[]{apply}).withSideInputs(new PCollectionView[]{apply2}).withSideInputs(new PCollectionView[]{apply3}).of(new TestDoFn(Arrays.asList(apply, apply3), Arrays.asList(new TupleTag[0]))))).satisfies(HasExpectedOutput.forInput(asList).andSideInputs(11, 222));
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testMultiOutputParDoWithSideInputs() {
        TestPipeline create = TestPipeline.create();
        List asList = Arrays.asList(3, -42, 666);
        TupleTag<String> tupleTag = new TupleTag<String>("main") { // from class: org.apache.beam.sdk.transforms.ParDoTest.20
        };
        TupleTag<Void> tupleTag2 = new TupleTag<Void>("sideOutput") { // from class: org.apache.beam.sdk.transforms.ParDoTest.21
        };
        PCollectionView apply = create.apply("CreateSideInput1", Create.of(new Integer[]{11})).apply("ViewSideInput1", View.asSingleton());
        PCollectionView apply2 = create.apply("CreateSideInputUnread", Create.of(new Integer[]{-3333})).apply("ViewSideInputUnread", View.asSingleton());
        PCollectionView apply3 = create.apply("CreateSideInput2", Create.of(new Integer[]{222})).apply("ViewSideInput2", View.asSingleton());
        PAssert.that(create.apply(Create.of(asList)).apply(ParDo.withSideInputs(new PCollectionView[]{apply}).withSideInputs(new PCollectionView[]{apply2}).withSideInputs(new PCollectionView[]{apply3}).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)).of(new TestDoFn(Arrays.asList(apply, apply3), Arrays.asList(new TupleTag[0])))).get(tupleTag)).satisfies(HasExpectedOutput.forInput(asList).andSideInputs(11, 222));
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testMultiOutputParDoWithSideInputsIsCumulative() {
        TestPipeline create = TestPipeline.create();
        List asList = Arrays.asList(3, -42, 666);
        TupleTag<String> tupleTag = new TupleTag<String>("main") { // from class: org.apache.beam.sdk.transforms.ParDoTest.22
        };
        TupleTag<Void> tupleTag2 = new TupleTag<Void>("sideOutput") { // from class: org.apache.beam.sdk.transforms.ParDoTest.23
        };
        PCollectionView apply = create.apply("CreateSideInput1", Create.of(new Integer[]{11})).apply("ViewSideInput1", View.asSingleton());
        PCollectionView apply2 = create.apply("CreateSideInputUnread", Create.of(new Integer[]{-3333})).apply("ViewSideInputUnread", View.asSingleton());
        PCollectionView apply3 = create.apply("CreateSideInput2", Create.of(new Integer[]{222})).apply("ViewSideInput2", View.asSingleton());
        PAssert.that(create.apply(Create.of(asList)).apply(ParDo.withSideInputs(new PCollectionView[]{apply}).withSideInputs(new PCollectionView[]{apply2}).withSideInputs(new PCollectionView[]{apply3}).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)).of(new TestDoFn(Arrays.asList(apply, apply3), Arrays.asList(new TupleTag[0])))).get(tupleTag)).satisfies(HasExpectedOutput.forInput(asList).andSideInputs(11, 222));
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testParDoReadingFromUnknownSideInput() {
        TestPipeline create = TestPipeline.create();
        create.apply("CreateMain", Create.of(Arrays.asList(3, -42, 666))).apply(ParDo.of(new TestDoFn(Arrays.asList(create.apply("Create3", Create.of(new Integer[]{3})).apply(View.asSingleton())), Arrays.asList(new TupleTag[0]))));
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("calling sideInput() with unknown view");
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testParDoWithErrorInStartBatch() {
        TestPipeline create = TestPipeline.create();
        create.apply(Create.of(Arrays.asList(3, -42, 666))).apply(ParDo.of(new TestStartBatchErrorDoFn()));
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("test error in initialize");
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testParDoWithErrorInProcessElement() {
        TestPipeline create = TestPipeline.create();
        create.apply(Create.of(Arrays.asList(3, -42, 666))).apply(ParDo.of(new TestProcessElementErrorDoFn()));
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("test error in process");
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testParDoWithErrorInFinishBatch() {
        TestPipeline create = TestPipeline.create();
        create.apply(Create.of(Arrays.asList(3, -42, 666))).apply(ParDo.of(new TestFinishBatchErrorDoFn()));
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("test error in finalize");
        create.run();
    }

    @Test
    public void testParDoGetName() {
        PCollection name = TestPipeline.create().apply(Create.of(Arrays.asList(3, -42, 666))).setName("MyInput");
        Assert.assertEquals("ParDo(Test).out", name.apply(ParDo.of(new TestDoFn())).getName());
        Assert.assertEquals("MyParDo.out", name.apply("MyParDo", ParDo.of(new TestDoFn())).getName());
        Assert.assertEquals("TestDoFn.out", name.apply("TestDoFn", ParDo.of(new TestDoFn())).getName());
        Assert.assertEquals("ParDo(StrangelyNamedDoer).out", name.apply(ParDo.of(new StrangelyNamedDoer())).getName());
        Assert.assertEquals("ParDo(Printing)", ParDo.of(new PrintingDoFn()).getName());
        Assert.assertEquals("ParMultiDo(SideOutputDummy)", ParDo.of(new SideOutputDummyFn(null)).withOutputTags((TupleTag) null, (TupleTagList) null).getName());
    }

    @Test
    public void testParDoWithSideOutputsName() {
        TestPipeline create = TestPipeline.create();
        TupleTag<String> tupleTag = new TupleTag<String>("main") { // from class: org.apache.beam.sdk.transforms.ParDoTest.24
        };
        TupleTag<String> tupleTag2 = new TupleTag<String>("side1") { // from class: org.apache.beam.sdk.transforms.ParDoTest.25
        };
        TupleTag<String> tupleTag3 = new TupleTag<String>("side2") { // from class: org.apache.beam.sdk.transforms.ParDoTest.26
        };
        TupleTag<String> tupleTag4 = new TupleTag<String>("side3") { // from class: org.apache.beam.sdk.transforms.ParDoTest.27
        };
        TupleTag<String> tupleTag5 = new TupleTag<String>("sideUnwritten") { // from class: org.apache.beam.sdk.transforms.ParDoTest.28
        };
        PCollectionTuple apply = create.apply(Create.of(Arrays.asList(3, -42, 666))).setName("MyInput").apply("MyParDo", ParDo.of(new TestDoFn(Arrays.asList(new PCollectionView[0]), Arrays.asList(tupleTag2, tupleTag3, tupleTag4))).withOutputTags(tupleTag, TupleTagList.of(tupleTag4).and(tupleTag2).and(tupleTag5).and(tupleTag3)));
        Assert.assertEquals("MyParDo.main", apply.get(tupleTag).getName());
        Assert.assertEquals("MyParDo.side1", apply.get(tupleTag2).getName());
        Assert.assertEquals("MyParDo.side2", apply.get(tupleTag3).getName());
        Assert.assertEquals("MyParDo.side3", apply.get(tupleTag4).getName());
        Assert.assertEquals("MyParDo.sideUnwritten", apply.get(tupleTag5).getName());
    }

    @Test
    @Category({RunnableOnService.class})
    public void testParDoInCustomTransform() {
        TestPipeline create = TestPipeline.create();
        List asList = Arrays.asList(3, -42, 666);
        PAssert.that(create.apply(Create.of(asList)).apply("CustomTransform", new PTransform<PCollection<Integer>, PCollection<String>>() { // from class: org.apache.beam.sdk.transforms.ParDoTest.29
            public PCollection<String> apply(PCollection<Integer> pCollection) {
                return pCollection.apply(ParDo.of(new TestDoFn()));
            }
        })).satisfies(HasExpectedOutput.forInput(asList));
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testMultiOutputChaining() {
        TestPipeline create = TestPipeline.create();
        PCollectionTuple apply = create.apply(Create.of(Arrays.asList(3, 4, 5, 6))).apply(new MultiFilter());
        PCollection pCollection = apply.get(MultiFilter.BY2);
        PCollection pCollection2 = apply.get(MultiFilter.BY3);
        PCollection apply2 = pCollection.apply("Filter3sAgain", ParDo.of(new MultiFilter.FilterFn(3)));
        PCollection apply3 = pCollection2.apply("Filter2sAgain", ParDo.of(new MultiFilter.FilterFn(2)));
        PAssert.that(apply2).containsInAnyOrder(new Integer[]{6});
        PAssert.that(apply3).containsInAnyOrder(new Integer[]{6});
        create.run();
    }

    @Test
    public void testJsonEscaping() {
        byte[] serializeToByteArray = SerializableUtils.serializeToByteArray(new DoFn<Integer, Integer>() { // from class: org.apache.beam.sdk.transforms.ParDoTest.30
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                processContext.output(Integer.valueOf(((Integer) processContext.element()).intValue() + 1));
            }
        });
        Assert.assertArrayEquals(serializeToByteArray, StringUtils.jsonStringToByteArray(StringUtils.byteArrayToJsonString(serializeToByteArray)));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testSideOutputUnknownCoder() throws Exception {
        TestPipeline create = TestPipeline.create();
        PCollection apply = create.apply(Create.of(Arrays.asList(1, 2, 3)));
        TupleTag tupleTag = new TupleTag("main");
        TupleTag tupleTag2 = new TupleTag("unknownSide");
        apply.apply(ParDo.of(new SideOutputDummyFn(tupleTag2)).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)));
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage("Unable to return a default Coder");
        create.run();
    }

    @Test
    public void testSideOutputUnregisteredExplicitCoder() throws Exception {
        PCollection apply = TestPipeline.create().apply(Create.of(Arrays.asList(1, 2, 3)));
        TupleTag tupleTag = new TupleTag("main");
        TupleTag tupleTag2 = new TupleTag("unregisteredSide");
        PCollectionTuple apply2 = apply.apply(ParDo.of(new SideOutputDummyFn(tupleTag2)).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)));
        apply2.get(tupleTag2).setCoder(new TestDummyCoder());
        apply2.get(tupleTag2).apply(View.asSingleton());
        Assert.assertEquals(new TestDummyCoder(), apply2.get(tupleTag2).getCoder());
        apply2.get(tupleTag2).finishSpecifyingOutput();
        Assert.assertEquals(new TestDummyCoder(), apply2.get(tupleTag2).getCoder());
    }

    @Test
    @Category({NeedsRunner.class})
    public void testMainOutputUnregisteredExplicitCoder() {
        TestPipeline create = TestPipeline.create();
        PCollection apply = create.apply(Create.of(Arrays.asList(1, 2, 3)));
        TupleTag tupleTag = new TupleTag("unregisteredMain");
        TupleTag<Integer> tupleTag2 = new TupleTag<Integer>("side") { // from class: org.apache.beam.sdk.transforms.ParDoTest.31
        };
        apply.apply(ParDo.of(new MainOutputDummyFn(tupleTag2)).withOutputTags(tupleTag, TupleTagList.of(tupleTag2))).get(tupleTag).setCoder(new TestDummyCoder());
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testMainOutputApplySideOutputNoCoder() {
        TestPipeline create = TestPipeline.create();
        TupleTag tupleTag = new TupleTag("main");
        final TupleTag tupleTag2 = new TupleTag("side");
        PCollectionTuple apply = create.apply(Create.of(new TestDummy[]{new TestDummy()}).withCoder(TestDummyCoder.of())).apply(ParDo.withOutputTags(tupleTag, TupleTagList.of(tupleTag2)).of(new DoFn<TestDummy, TestDummy>() { // from class: org.apache.beam.sdk.transforms.ParDoTest.32
            public void processElement(DoFn<TestDummy, TestDummy>.ProcessContext processContext) {
                TestDummy testDummy = (TestDummy) processContext.element();
                processContext.output(testDummy);
                processContext.sideOutput(tupleTag2, testDummy);
            }
        }));
        apply.get(tupleTag).setCoder(TestDummyCoder.of()).apply("Output1", ParDo.of(new DoFn<TestDummy, Integer>() { // from class: org.apache.beam.sdk.transforms.ParDoTest.33
            public void processElement(DoFn<TestDummy, Integer>.ProcessContext processContext) {
                processContext.output(1);
            }
        }));
        apply.get(tupleTag2).setCoder(TestDummyCoder.of());
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testParDoOutputWithTimestamp() {
        TestPipeline create = TestPipeline.create();
        PAssert.that(create.apply(Create.of(Arrays.asList(3, 42, 6))).apply(ParDo.of(new TestOutputTimestampDoFn())).apply(ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO, Duration.ZERO))).apply(ParDo.of(new TestFormatTimestampDoFn()))).containsInAnyOrder(new String[]{"processing: 3, timestamp: 3", "processing: 42, timestamp: 42", "processing: 6, timestamp: 6"});
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testParDoSideOutputWithTimestamp() {
        TestPipeline create = TestPipeline.create();
        PCollection apply = create.apply(Create.of(Arrays.asList(3, 42, 6)));
        TupleTag<Integer> tupleTag = new TupleTag<Integer>("main") { // from class: org.apache.beam.sdk.transforms.ParDoTest.34
        };
        final TupleTag<Integer> tupleTag2 = new TupleTag<Integer>("side") { // from class: org.apache.beam.sdk.transforms.ParDoTest.35
        };
        PAssert.that(apply.apply(ParDo.withOutputTags(tupleTag, TupleTagList.of(tupleTag2)).of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.sdk.transforms.ParDoTest.36
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                processContext.sideOutputWithTimestamp(tupleTag2, processContext.element(), new Instant(((Integer) processContext.element()).longValue()));
            }
        })).get(tupleTag2).apply(ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO, Duration.ZERO))).apply(ParDo.of(new TestFormatTimestampDoFn()))).containsInAnyOrder(new String[]{"processing: 3, timestamp: 3", "processing: 42, timestamp: 42", "processing: 6, timestamp: 6"});
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testParDoShiftTimestamp() {
        TestPipeline create = TestPipeline.create();
        PAssert.that(create.apply(Create.of(Arrays.asList(3, 42, 6))).apply(ParDo.of(new TestOutputTimestampDoFn())).apply(ParDo.of(new TestShiftTimestampDoFn(Duration.millis(1000L), Duration.millis(-1000L)))).apply(ParDo.of(new TestFormatTimestampDoFn()))).containsInAnyOrder(new String[]{"processing: 3, timestamp: -997", "processing: 42, timestamp: -958", "processing: 6, timestamp: -994"});
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testParDoShiftTimestampInvalid() {
        TestPipeline create = TestPipeline.create();
        create.apply(Create.of(Arrays.asList(3, 42, 6))).apply(ParDo.of(new TestOutputTimestampDoFn())).apply(ParDo.of(new TestShiftTimestampDoFn(Duration.millis(1000L), Duration.millis(-1001L)))).apply(ParDo.of(new TestFormatTimestampDoFn()));
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("Cannot output with timestamp");
        this.thrown.expectMessage("Output timestamps must be no earlier than the timestamp of the current input");
        this.thrown.expectMessage("minus the allowed skew (1 second).");
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testParDoShiftTimestampInvalidZeroAllowed() {
        TestPipeline create = TestPipeline.create();
        create.apply(Create.of(Arrays.asList(3, 42, 6))).apply(ParDo.of(new TestOutputTimestampDoFn())).apply(ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO, Duration.millis(-1001L)))).apply(ParDo.of(new TestFormatTimestampDoFn()));
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("Cannot output with timestamp");
        this.thrown.expectMessage("Output timestamps must be no earlier than the timestamp of the current input");
        this.thrown.expectMessage("minus the allowed skew (0 milliseconds).");
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testWindowingInStartAndFinishBundle() {
        TestPipeline create = TestPipeline.create();
        PAssert.that(create.apply(Create.timestamped(new TimestampedValue[]{TimestampedValue.of("elem", new Instant(1L))})).apply(Window.into(FixedWindows.of(Duration.millis(1L)))).apply(ParDo.of(new DoFn<String, String>() { // from class: org.apache.beam.sdk.transforms.ParDoTest.37
            public void startBundle(DoFn<String, String>.Context context) {
                context.outputWithTimestamp("start", new Instant(2L));
                System.out.println("Start: 2");
            }

            public void processElement(DoFn<String, String>.ProcessContext processContext) {
                processContext.output(processContext.element());
                System.out.println("Process: " + ((String) processContext.element()) + ":" + processContext.timestamp().getMillis());
            }

            public void finishBundle(DoFn<String, String>.Context context) {
                context.outputWithTimestamp("finish", new Instant(3L));
                System.out.println("Finish: 3");
            }
        })).apply(ParDo.of(new PrintingDoFn()))).satisfies(new Checker());
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testWindowingInStartBundleException() {
        TestPipeline create = TestPipeline.create();
        create.apply(Create.timestamped(new TimestampedValue[]{TimestampedValue.of("elem", new Instant(1L))})).apply(Window.into(FixedWindows.of(Duration.millis(1L)))).apply(ParDo.of(new DoFn<String, String>() { // from class: org.apache.beam.sdk.transforms.ParDoTest.38
            public void startBundle(DoFn<String, String>.Context context) {
                context.output("start");
            }

            public void processElement(DoFn<String, String>.ProcessContext processContext) {
                processContext.output(processContext.element());
            }
        }));
        this.thrown.expectMessage("WindowFn attempted to access input timestamp when none was available");
        create.run();
    }

    @Test
    public void testDoFnDisplayData() {
        DoFn<String, String> doFn = new DoFn<String, String>() { // from class: org.apache.beam.sdk.transforms.ParDoTest.39
            public void processElement(DoFn<String, String>.ProcessContext processContext) {
            }

            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item("doFnMetadata", "bar"));
            }
        };
        DisplayData from = DisplayData.from(ParDo.of(doFn));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem((Matcher<DisplayData.Item<?>>) Matchers.allOf(DisplayDataMatchers.hasKey("fn"), DisplayDataMatchers.hasType(DisplayData.Type.JAVA_CLASS), DisplayDataMatchers.hasValue(doFn.getClass().getName()))));
        Assert.assertThat(from, DisplayDataMatchers.includesDisplayDataFrom(doFn));
    }

    @Test
    public void testDoFnWithContextDisplayData() {
        DoFnWithContext<String, String> doFnWithContext = new DoFnWithContext<String, String>() { // from class: org.apache.beam.sdk.transforms.ParDoTest.40
            @DoFnWithContext.ProcessElement
            public void proccessElement(DoFnWithContext<String, String>.ProcessContext processContext) {
            }

            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item("fnMetadata", "foobar"));
            }
        };
        DisplayData from = DisplayData.from(ParDo.of(doFnWithContext));
        Assert.assertThat(from, DisplayDataMatchers.includesDisplayDataFrom(doFnWithContext));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("fn", doFnWithContext.getClass()));
    }

    @Test
    public void testWithOutputTagsDisplayData() {
        DoFnWithContext<String, String> doFnWithContext = new DoFnWithContext<String, String>() { // from class: org.apache.beam.sdk.transforms.ParDoTest.41
            @DoFnWithContext.ProcessElement
            public void proccessElement(DoFnWithContext<String, String>.ProcessContext processContext) {
            }

            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item("fnMetadata", "foobar"));
            }
        };
        DisplayData from = DisplayData.from(ParDo.withOutputTags(new TupleTag(), TupleTagList.empty()).of(doFnWithContext));
        Assert.assertThat(from, DisplayDataMatchers.includesDisplayDataFrom(doFnWithContext));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("fn", doFnWithContext.getClass()));
    }
}
