/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.transforms;

import com.fasterxml.jackson.annotation.JsonCreator;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.MoreObjects;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.ImmutableList;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.Lists;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.Sets;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.GroupingState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesMapState;
import org.apache.beam.sdk.testing.UsesSetState;
import org.apache.beam.sdk.testing.UsesStatefulParDo;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.UsesTimersInParDo;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Mean;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.StringUtils;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.MutableDateTime;
import org.joda.time.ReadableDuration;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

public class ParDoTest
implements Serializable {

    private static class TwoTimerDoFn
    extends DoFn<KV<String, String>, String> {
        @DoFn.TimerId(value="timer")
        private final TimerSpec timer = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

        private TwoTimerDoFn() {
        }

        @DoFn.ProcessElement
        public void process(DoFn.ProcessContext c, @DoFn.TimerId(value="timer") Timer timer) {
            timer.offset(Duration.standardMinutes((long)10L)).setRelative();
            timer.offset(Duration.standardMinutes((long)30L)).setRelative();
        }

        @DoFn.OnTimer(value="timer")
        public void onTimer(DoFn.OutputReceiver<String> r, @DoFn.TimerId(value="timer") Timer timer) {
            r.output((Object)"It works");
        }
    }

    public static interface MyOptions
    extends PipelineOptions {
        @Default.String(value="fake option")
        public String getFakeOption();

        public void setFakeOption(String var1);
    }

    private static class Checker
    implements SerializableFunction<Iterable<String>, Void> {
        private Checker() {
        }

        public Void apply(Iterable<String> input) {
            boolean foundElement = false;
            boolean foundFinish = false;
            for (String str : input) {
                if ("elem:1:1".equals(str)) {
                    if (foundElement) {
                        throw new AssertionError((Object)"Received duplicate element");
                    }
                    foundElement = true;
                    continue;
                }
                if ("finish:3:3".equals(str)) {
                    foundFinish = true;
                    continue;
                }
                throw new AssertionError((Object)("Got unexpected value: " + str));
            }
            if (!foundElement) {
                throw new AssertionError((Object)"Missing \"elem:1:1\"");
            }
            if (!foundFinish) {
                throw new AssertionError((Object)"Missing \"finish:3:3\"");
            }
            return null;
        }
    }

    static class HasExpectedOutput
    implements SerializableFunction<Iterable<String>, Void>,
    Serializable {
        private final List<Integer> inputs;
        private final List<Integer> sideInputs;
        private final String additionalOutput;

        public static HasExpectedOutput forInput(List<Integer> inputs) {
            return new HasExpectedOutput(new ArrayList<Integer>(inputs), new ArrayList<Integer>(), "");
        }

        private HasExpectedOutput(List<Integer> inputs, List<Integer> sideInputs, String additionalOutput) {
            this.inputs = inputs;
            this.sideInputs = sideInputs;
            this.additionalOutput = additionalOutput;
        }

        public HasExpectedOutput andSideInputs(Integer ... sideInputValues) {
            return new HasExpectedOutput(this.inputs, Arrays.asList(sideInputValues), this.additionalOutput);
        }

        public HasExpectedOutput fromOutput(TupleTag<String> outputTag) {
            return this.fromOutput(outputTag.getId());
        }

        public HasExpectedOutput fromOutput(String outputId) {
            return new HasExpectedOutput(this.inputs, this.sideInputs, outputId);
        }

        public Void apply(Iterable<String> outputs) {
            ArrayList<String> processeds = new ArrayList<String>();
            ArrayList<String> finisheds = new ArrayList<String>();
            for (String output : outputs) {
                if (output.contains("finished")) {
                    finisheds.add(output);
                    continue;
                }
                processeds.add(output);
            }
            String sideInputsSuffix = this.sideInputs.isEmpty() ? "" : ": " + this.sideInputs;
            String additionalOutputPrefix = this.additionalOutput.isEmpty() ? "" : this.additionalOutput + ": ";
            ArrayList<String> expectedProcesseds = new ArrayList<String>();
            for (Integer input : this.inputs) {
                expectedProcesseds.add(additionalOutputPrefix + "processing: " + input + sideInputsSuffix);
            }
            Object[] expectedProcessedsArray = expectedProcesseds.toArray(new String[expectedProcesseds.size()]);
            Assert.assertThat(processeds, (Matcher)IsIterableContainingInAnyOrder.containsInAnyOrder((Object[])expectedProcessedsArray));
            for (String finished : finisheds) {
                Assert.assertEquals((Object)(additionalOutputPrefix + "finished"), (Object)finished);
            }
            return null;
        }
    }

    private static class MyIntegerCoder
    extends AtomicCoder<MyInteger> {
        private static final MyIntegerCoder INSTANCE = new MyIntegerCoder();
        private final VarIntCoder delegate = VarIntCoder.of();

        private MyIntegerCoder() {
        }

        public static MyIntegerCoder of() {
            return INSTANCE;
        }

        public void encode(MyInteger value, OutputStream outStream) throws CoderException, IOException {
            this.delegate.encode(Integer.valueOf(value.getValue()), outStream);
        }

        public MyInteger decode(InputStream inStream) throws CoderException, IOException {
            return new MyInteger(this.delegate.decode(inStream));
        }
    }

    private static class MyInteger
    implements Comparable<MyInteger> {
        private final int value;

        MyInteger(int value) {
            this.value = value;
        }

        public int getValue() {
            return this.value;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof MyInteger)) {
                return false;
            }
            MyInteger myInteger = (MyInteger)o;
            return this.value == myInteger.value;
        }

        public int hashCode() {
            return this.value;
        }

        @Override
        public int compareTo(MyInteger o) {
            return Integer.compare(this.getValue(), o.getValue());
        }

        public String toString() {
            return "MyInteger{value=" + this.value + '}';
        }
    }

    private static class MainOutputDummyFn
    extends DoFn<Integer, TestDummy> {
        private TupleTag<TestDummy> mainOutputTag;
        private TupleTag<Integer> intOutputTag;

        public MainOutputDummyFn(TupleTag<TestDummy> mainOutputTag, TupleTag<Integer> intOutputTag) {
            this.mainOutputTag = mainOutputTag;
            this.intOutputTag = intOutputTag;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.MultiOutputReceiver r) {
            r.get(this.mainOutputTag).output((Object)new TestDummy());
            r.get(this.intOutputTag).output((Object)1);
        }
    }

    private static class TaggedOutputDummyFn
    extends DoFn<Integer, Integer> {
        private TupleTag<Integer> mainOutputTag;
        private TupleTag<TestDummy> dummyOutputTag;

        public TaggedOutputDummyFn(TupleTag<Integer> mainOutputTag, TupleTag<TestDummy> dummyOutputTag) {
            this.mainOutputTag = mainOutputTag;
            this.dummyOutputTag = dummyOutputTag;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.MultiOutputReceiver r) {
            r.get(this.mainOutputTag).output((Object)1);
            r.get(this.dummyOutputTag).output((Object)new TestDummy());
        }
    }

    private static class TestDummyCoder
    extends AtomicCoder<TestDummy> {
        private static final TestDummyCoder INSTANCE = new TestDummyCoder();

        private TestDummyCoder() {
        }

        @JsonCreator
        public static TestDummyCoder of() {
            return INSTANCE;
        }

        public void encode(TestDummy value, OutputStream outStream) throws CoderException, IOException {
        }

        public TestDummy decode(InputStream inStream) throws CoderException, IOException {
            return new TestDummy();
        }

        public boolean isRegisterByteSizeObserverCheap(TestDummy value) {
            return true;
        }

        public void registerByteSizeObserver(TestDummy value, ElementByteSizeObserver observer) throws Exception {
            observer.update((Object)0L);
        }

        public void verifyDeterministic() {
        }
    }

    private static class TestDummy {
        private TestDummy() {
        }
    }

    private static class FnWithSideInputs
    extends DoFn<String, String> {
        private final PCollectionView<Integer> view;

        private FnWithSideInputs(PCollectionView<Integer> view) {
            this.view = view;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c, @DoFn.Element String element) {
            c.output((Object)(element + ":" + c.sideInput(this.view)));
        }
    }

    @RunWith(value=JUnit4.class)
    public static class TimerCoderInferenceTests
    extends SharedTestBase
    implements Serializable {
        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class})
        public void testValueStateCoderInference() {
            String stateId = "foo";
            MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
            this.pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, (Coder)myIntegerCoder);
            DoFn<KV<String, Integer>, MyInteger> fn = new DoFn<KV<String, Integer>, MyInteger>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<ValueState<MyInteger>> intState = StateSpecs.value();

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c, @DoFn.StateId(value="foo") ValueState<MyInteger> state, DoFn.OutputReceiver<MyInteger> r) {
                    MyInteger currentValue = MoreObjects.firstNonNull((MyInteger)state.read(), new MyInteger(0));
                    r.output((Object)currentValue);
                    state.write((Object)new MyInteger(currentValue.getValue() + 1));
                }
            };
            PCollection output = ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)42), (Object[])new KV[]{KV.of((Object)"hello", (Object)97), KV.of((Object)"hello", (Object)84)}))).apply((PTransform)ParDo.of((DoFn)fn))).setCoder((Coder)myIntegerCoder);
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new MyInteger[]{new MyInteger(0), new MyInteger(1), new MyInteger(2)});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class})
        public void testValueStateCoderInferenceFailure() throws Exception {
            String stateId = "foo";
            MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
            DoFn<KV<String, Integer>, MyInteger> fn = new DoFn<KV<String, Integer>, MyInteger>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<ValueState<MyInteger>> intState = StateSpecs.value();

                @DoFn.ProcessElement
                public void processElement(@DoFn.StateId(value="foo") ValueState<MyInteger> state, DoFn.OutputReceiver<MyInteger> r) {
                    MyInteger currentValue = MoreObjects.firstNonNull((MyInteger)state.read(), new MyInteger(0));
                    r.output((Object)currentValue);
                    state.write((Object)new MyInteger(currentValue.getValue() + 1));
                }
            };
            this.thrown.expect(RuntimeException.class);
            this.thrown.expectMessage("Unable to infer a coder for ValueState and no Coder was specified.");
            ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)42), (Object[])new KV[]{KV.of((Object)"hello", (Object)97), KV.of((Object)"hello", (Object)84)}))).apply((PTransform)ParDo.of((DoFn)fn))).setCoder((Coder)myIntegerCoder);
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class})
        public void testValueStateCoderInferenceFromInputCoder() {
            String stateId = "foo";
            MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
            DoFn<KV<String, MyInteger>, MyInteger> fn = new DoFn<KV<String, MyInteger>, MyInteger>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<ValueState<MyInteger>> intState = StateSpecs.value();

                @DoFn.ProcessElement
                public void processElement(@DoFn.StateId(value="foo") ValueState<MyInteger> state, DoFn.OutputReceiver<MyInteger> r) {
                    MyInteger currentValue = MoreObjects.firstNonNull((MyInteger)state.read(), new MyInteger(0));
                    r.output((Object)currentValue);
                    state.write((Object)new MyInteger(currentValue.getValue() + 1));
                }
            };
            ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)new MyInteger(42)), (Object[])new KV[]{KV.of((Object)"hello", (Object)new MyInteger(97)), KV.of((Object)"hello", (Object)new MyInteger(84))}).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)myIntegerCoder)))).apply((PTransform)ParDo.of((DoFn)fn))).setCoder((Coder)myIntegerCoder);
            this.pipeline.run();
        }
    }

    @RunWith(value=JUnit4.class)
    public static class TimerTests
    extends SharedTestBase
    implements Serializable {
        @Test
        public void testTimerNotKeyed() {
            String timerId = "foo";
            DoFn<String, Integer> fn = new DoFn<String, Integer>(){
                @DoFn.TimerId(value="foo")
                private final TimerSpec timer = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c, @DoFn.TimerId(value="foo") Timer timer) {
                }

                @DoFn.OnTimer(value="foo")
                public void onTimer() {
                }
            };
            this.thrown.expect(IllegalArgumentException.class);
            this.thrown.expectMessage("timer");
            this.thrown.expectMessage("KvCoder");
            ((PCollection)this.pipeline.apply((PTransform)Create.of((Object)"hello", (Object[])new String[]{"goodbye", "hello again"}))).apply((PTransform)ParDo.of((DoFn)fn));
        }

        @Test
        public void testTimerNotDeterministic() {
            String timerId = "foo";
            DoFn<KV<Double, String>, Integer> fn = new DoFn<KV<Double, String>, Integer>(){
                @DoFn.TimerId(value="foo")
                private final TimerSpec timer = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c, @DoFn.TimerId(value="foo") Timer timer) {
                }

                @DoFn.OnTimer(value="foo")
                public void onTimer() {
                }
            };
            this.thrown.expect(IllegalArgumentException.class);
            this.thrown.expectMessage("timer");
            this.thrown.expectMessage("deterministic");
            ((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)1.0, (Object)"hello"), (Object[])new KV[]{KV.of((Object)5.4, (Object)"goodbye"), KV.of((Object)7.2, (Object)"hello again")}))).apply((PTransform)ParDo.of((DoFn)fn));
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesTimersInParDo.class})
        public void testEventTimeTimerBounded() throws Exception {
            String timerId = "foo";
            DoFn<KV<String, Integer>, Integer> fn = new DoFn<KV<String, Integer>, Integer>(){
                @DoFn.TimerId(value="foo")
                private final TimerSpec spec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

                @DoFn.ProcessElement
                public void processElement(@DoFn.TimerId(value="foo") Timer timer, DoFn.OutputReceiver<Integer> r) {
                    timer.offset(Duration.standardSeconds((long)1L)).setRelative();
                    r.output((Object)3);
                }

                @DoFn.OnTimer(value="foo")
                public void onTimer(TimeDomain timeDomain, DoFn.OutputReceiver<Integer> r) {
                    if (timeDomain.equals((Object)TimeDomain.EVENT_TIME)) {
                        r.output((Object)42);
                    }
                }
            };
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)37), (Object[])new KV[0]))).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Integer[]{3, 42});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesTimersInParDo.class})
        public void testGbkFollowedByUserTimers() throws Exception {
            DoFn<KV<String, Iterable<Integer>>, Integer> fn = new DoFn<KV<String, Iterable<Integer>>, Integer>(){
                public static final String TIMER_ID = "foo";
                @DoFn.TimerId(value="foo")
                private final TimerSpec spec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

                @DoFn.ProcessElement
                public void processElement(@DoFn.TimerId(value="foo") Timer timer, DoFn.OutputReceiver<Integer> r) {
                    timer.offset(Duration.standardSeconds((long)1L)).setRelative();
                    r.output((Object)3);
                }

                @DoFn.OnTimer(value="foo")
                public void onTimer(TimeDomain timeDomain, DoFn.OutputReceiver<Integer> r) {
                    if (timeDomain.equals((Object)TimeDomain.EVENT_TIME)) {
                        r.output((Object)42);
                    }
                }
            };
            PCollection output = (PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)37), (Object[])new KV[0]))).apply((PTransform)GroupByKey.create())).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Integer[]{3, 42});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesTimersInParDo.class})
        public void testEventTimeTimerAlignBounded() throws Exception {
            String timerId = "foo";
            DoFn<KV<String, Integer>, KV<Integer, Instant>> fn = new DoFn<KV<String, Integer>, KV<Integer, Instant>>(){
                @DoFn.TimerId(value="foo")
                private final TimerSpec spec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

                @DoFn.ProcessElement
                public void processElement(@DoFn.TimerId(value="foo") Timer timer, @DoFn.Timestamp Instant timestamp, DoFn.OutputReceiver<KV<Integer, Instant>> r) {
                    timer.align(Duration.standardSeconds((long)1L)).offset(Duration.millis((long)1L)).setRelative();
                    r.output((Object)KV.of((Object)3, (Object)timestamp));
                }

                @DoFn.OnTimer(value="foo")
                public void onTimer(@DoFn.Timestamp Instant timestamp, DoFn.OutputReceiver<KV<Integer, Instant>> r) {
                    r.output((Object)KV.of((Object)42, (Object)timestamp));
                }
            };
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)37), (Object[])new KV[0]))).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new KV[]{KV.of((Object)3, (Object)BoundedWindow.TIMESTAMP_MIN_VALUE), KV.of((Object)42, (Object)BoundedWindow.TIMESTAMP_MIN_VALUE.plus(1774L))});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesTimersInParDo.class})
        public void testTimerReceivedInOriginalWindow() throws Exception {
            String timerId = "foo";
            DoFn<KV<String, Integer>, BoundedWindow> fn = new DoFn<KV<String, Integer>, BoundedWindow>(){
                @DoFn.TimerId(value="foo")
                private final TimerSpec spec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

                @DoFn.ProcessElement
                public void processElement(@DoFn.TimerId(value="foo") Timer timer) {
                    timer.offset(Duration.standardSeconds((long)1L)).setRelative();
                }

                @DoFn.OnTimer(value="foo")
                public void onTimer(BoundedWindow window, DoFn.OutputReceiver<BoundedWindow> r) {
                    r.output((Object)window);
                }

                public TypeDescriptor<BoundedWindow> getOutputTypeDescriptor() {
                    return TypeDescriptor.of(IntervalWindow.class);
                }
            };
            SlidingWindows windowing = SlidingWindows.of((Duration)Duration.standardMinutes((long)3L)).every(Duration.standardMinutes((long)1L));
            PCollection output = (PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.timestamped((TimestampedValue)TimestampedValue.of((Object)KV.of((Object)"hello", (Object)24), (Instant)new Instant(0L)), (TimestampedValue[])new TimestampedValue[0]))).apply((PTransform)Window.into((WindowFn)windowing))).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new BoundedWindow[]{new IntervalWindow(new Instant(0L), (ReadableDuration)Duration.standardMinutes((long)3L)), new IntervalWindow(new Instant(0L).minus((ReadableDuration)Duration.standardMinutes((long)1L)), (ReadableDuration)Duration.standardMinutes((long)3L)), new IntervalWindow(new Instant(0L).minus((ReadableDuration)Duration.standardMinutes((long)2L)), (ReadableDuration)Duration.standardMinutes((long)3L))});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesTimersInParDo.class})
        public void testEventTimeTimerAbsolute() throws Exception {
            String timerId = "foo";
            DoFn<KV<String, Integer>, Integer> fn = new DoFn<KV<String, Integer>, Integer>(){
                @DoFn.TimerId(value="foo")
                private final TimerSpec spec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

                @DoFn.ProcessElement
                public void processElement(@DoFn.TimerId(value="foo") Timer timer, BoundedWindow window, DoFn.OutputReceiver<Integer> r) {
                    timer.set(window.maxTimestamp());
                    r.output((Object)3);
                }

                @DoFn.OnTimer(value="foo")
                public void onTimer(DoFn.OutputReceiver<Integer> r) {
                    r.output((Object)42);
                }
            };
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)37), (Object[])new KV[0]))).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Integer[]{3, 42});
            this.pipeline.run();
        }

        @Ignore(value="https://issues.apache.org/jira/browse/BEAM-2791, https://issues.apache.org/jira/browse/BEAM-2535")
        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class, UsesTimersInParDo.class})
        public void testEventTimeTimerLoop() {
            String stateId = "count";
            String timerId = "timer";
            int loopCount = 5;
            DoFn<KV<String, Integer>, Integer> fn = new DoFn<KV<String, Integer>, Integer>(){
                @DoFn.TimerId(value="timer")
                private final TimerSpec loopSpec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);
                @DoFn.StateId(value="count")
                private final StateSpec<ValueState<Integer>> countSpec = StateSpecs.value();

                @DoFn.ProcessElement
                public void processElement(@DoFn.StateId(value="count") ValueState<Integer> countState, @DoFn.TimerId(value="timer") Timer loopTimer) {
                    loopTimer.offset(Duration.millis((long)1L)).setRelative();
                }

                @DoFn.OnTimer(value="timer")
                public void onLoopTimer(@DoFn.StateId(value="count") ValueState<Integer> countState, @DoFn.TimerId(value="timer") Timer loopTimer, DoFn.OutputReceiver<Integer> r) {
                    int count = MoreObjects.firstNonNull((Integer)countState.read(), 0);
                    if (count < 5) {
                        r.output((Object)count);
                        countState.write((Object)(count + 1));
                        loopTimer.offset(Duration.millis((long)1L)).setRelative();
                    }
                }
            };
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)42), (Object[])new KV[0]))).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Integer[]{0, 1, 2, 3, 4});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesTimersInParDo.class})
        public void testEventTimeTimerMultipleKeys() throws Exception {
            String timerId = "foo";
            String stateId = "sizzle";
            int offset = 5000;
            int timerOutput = 4093;
            DoFn<KV<String, Integer>, KV<String, Integer>> fn = new DoFn<KV<String, Integer>, KV<String, Integer>>(){
                @DoFn.TimerId(value="foo")
                private final TimerSpec spec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);
                @DoFn.StateId(value="sizzle")
                private final StateSpec<ValueState<String>> stateSpec = StateSpecs.value((Coder)StringUtf8Coder.of());

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext context, @DoFn.TimerId(value="foo") Timer timer, @DoFn.StateId(value="sizzle") ValueState<String> state, BoundedWindow window) {
                    timer.set(window.maxTimestamp());
                    state.write((Object)((String)((KV)context.element()).getKey()));
                    context.output((Object)KV.of((Object)((String)((KV)context.element()).getKey()), (Object)((Integer)((KV)context.element()).getValue() + 5000)));
                }

                @DoFn.OnTimer(value="foo")
                public void onTimer(@DoFn.StateId(value="sizzle") ValueState<String> state, DoFn.OutputReceiver<KV<String, Integer>> r) {
                    r.output((Object)KV.of((Object)((String)state.read()), (Object)4093));
                }
            };
            int numKeys = 50;
            ArrayList<KV> input = new ArrayList<KV>();
            ArrayList<KV> expectedOutput = new ArrayList<KV>();
            Integer key = 0;
            while (key < numKeys) {
                expectedOutput.add(KV.of((Object)key.toString(), (Object)4093));
                for (int i = 0; i < 15; ++i) {
                    input.add(KV.of((Object)key.toString(), (Object)i));
                    expectedOutput.add(KV.of((Object)key.toString(), (Object)(i + 5000)));
                }
                key = key + 1;
            }
            Collections.shuffle(input);
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(input))).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder(expectedOutput);
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesTimersInParDo.class})
        public void testAbsoluteProcessingTimeTimerRejected() throws Exception {
            String timerId = "foo";
            DoFn<KV<String, Integer>, Integer> fn = new DoFn<KV<String, Integer>, Integer>(){
                @DoFn.TimerId(value="foo")
                private final TimerSpec spec = TimerSpecs.timer((TimeDomain)TimeDomain.PROCESSING_TIME);

                @DoFn.ProcessElement
                public void processElement(@DoFn.TimerId(value="foo") Timer timer) {
                    timer.set(new Instant(0L));
                }

                @DoFn.OnTimer(value="foo")
                public void onTimer() {
                }
            };
            ((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)37), (Object[])new KV[0]))).apply((PTransform)ParDo.of((DoFn)fn));
            this.thrown.expect(RuntimeException.class);
            this.thrown.expectMessage("relative timers");
            this.thrown.expectMessage("processing time");
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesTimersInParDo.class})
        public void testOutOfBoundsEventTimeTimer() throws Exception {
            String timerId = "foo";
            DoFn<KV<String, Integer>, Integer> fn = new DoFn<KV<String, Integer>, Integer>(){
                @DoFn.TimerId(value="foo")
                private final TimerSpec spec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext context, BoundedWindow window, @DoFn.TimerId(value="foo") Timer timer) {
                    timer.set(window.maxTimestamp().plus(1L));
                }

                @DoFn.OnTimer(value="foo")
                public void onTimer() {
                }
            };
            ((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)37), (Object[])new KV[0]))).apply((PTransform)ParDo.of((DoFn)fn));
            this.thrown.expect(RuntimeException.class);
            this.thrown.expectMessage("event time timer");
            this.thrown.expectMessage("expiration");
            this.pipeline.run();
        }

        @Test
        @Category(value={NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
        public void testSimpleProcessingTimerTimer() throws Exception {
            String timerId = "foo";
            DoFn<KV<String, Integer>, Integer> fn = new DoFn<KV<String, Integer>, Integer>(){
                @DoFn.TimerId(value="foo")
                private final TimerSpec spec = TimerSpecs.timer((TimeDomain)TimeDomain.PROCESSING_TIME);

                @DoFn.ProcessElement
                public void processElement(@DoFn.TimerId(value="foo") Timer timer, DoFn.OutputReceiver<Integer> r) {
                    timer.offset(Duration.standardSeconds((long)1L)).setRelative();
                    r.output((Object)3);
                }

                @DoFn.OnTimer(value="foo")
                public void onTimer(TimeDomain timeDomain, DoFn.OutputReceiver<Integer> r) {
                    if (timeDomain.equals((Object)TimeDomain.PROCESSING_TIME)) {
                        r.output((Object)42);
                    }
                }
            };
            TestStream stream = TestStream.create((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of())).addElements((Object)KV.of((Object)"hello", (Object)37), (Object[])new KV[0]).advanceProcessingTime(Duration.standardSeconds((long)2L)).advanceWatermarkToInfinity();
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)stream)).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Integer[]{3, 42});
            this.pipeline.run();
        }

        @Test
        @Category(value={NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
        public void testEventTimeTimerUnbounded() throws Exception {
            String timerId = "foo";
            DoFn<KV<String, Integer>, Integer> fn = new DoFn<KV<String, Integer>, Integer>(){
                @DoFn.TimerId(value="foo")
                private final TimerSpec spec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

                @DoFn.ProcessElement
                public void processElement(@DoFn.TimerId(value="foo") Timer timer, DoFn.OutputReceiver<Integer> r) {
                    timer.offset(Duration.standardSeconds((long)1L)).setRelative();
                    r.output((Object)3);
                }

                @DoFn.OnTimer(value="foo")
                public void onTimer(DoFn.OutputReceiver<Integer> r) {
                    r.output((Object)42);
                }
            };
            TestStream stream = TestStream.create((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of())).advanceWatermarkTo(new Instant(0L)).addElements((Object)KV.of((Object)"hello", (Object)37), (Object[])new KV[0]).advanceWatermarkTo(new Instant(0L).plus((ReadableDuration)Duration.standardSeconds((long)1L))).advanceWatermarkToInfinity();
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)stream)).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Integer[]{3, 42});
            this.pipeline.run();
        }

        @Test
        @Category(value={NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
        public void testEventTimeTimerAlignUnbounded() throws Exception {
            String timerId = "foo";
            DoFn<KV<String, Integer>, KV<Integer, Instant>> fn = new DoFn<KV<String, Integer>, KV<Integer, Instant>>(){
                @DoFn.TimerId(value="foo")
                private final TimerSpec spec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

                @DoFn.ProcessElement
                public void processElement(@DoFn.TimerId(value="foo") Timer timer, @DoFn.Timestamp Instant timestamp, DoFn.OutputReceiver<KV<Integer, Instant>> r) {
                    timer.align(Duration.standardSeconds((long)1L)).offset(Duration.millis((long)1L)).setRelative();
                    r.output((Object)KV.of((Object)3, (Object)timestamp));
                }

                @DoFn.OnTimer(value="foo")
                public void onTimer(@DoFn.Timestamp Instant timestamp, DoFn.OutputReceiver<KV<Integer, Instant>> r) {
                    r.output((Object)KV.of((Object)42, (Object)timestamp));
                }
            };
            TestStream stream = TestStream.create((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of())).advanceWatermarkTo(new Instant(5L)).addElements((Object)KV.of((Object)"hello", (Object)37), (Object[])new KV[0]).advanceWatermarkTo(new Instant(0L).plus((ReadableDuration)Duration.standardSeconds((long)1L).plus(1L))).advanceWatermarkToInfinity();
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)stream)).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new KV[]{KV.of((Object)3, (Object)new Instant(5L)), KV.of((Object)42, (Object)new Instant(Duration.standardSeconds((long)1L).minus(1L).getMillis()))});
            this.pipeline.run();
        }

        @Test
        @Category(value={NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
        public void testEventTimeTimerAlignAfterGcTimeUnbounded() throws Exception {
            String timerId = "foo";
            DoFn<KV<String, Integer>, KV<Integer, Instant>> fn = new DoFn<KV<String, Integer>, KV<Integer, Instant>>(){
                @DoFn.TimerId(value="foo")
                private final TimerSpec spec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext context, @DoFn.TimerId(value="foo") Timer timer) {
                    timer.align(Duration.standardDays((long)1L)).setRelative();
                    context.output((Object)KV.of((Object)3, (Object)context.timestamp()));
                }

                @DoFn.OnTimer(value="foo")
                public void onTimer(@DoFn.Timestamp Instant timestamp, DoFn.OutputReceiver<KV<Integer, Instant>> r) {
                    r.output((Object)KV.of((Object)42, (Object)timestamp));
                }
            };
            TestStream stream = TestStream.create((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of())).advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE.minus((ReadableDuration)Duration.standardDays((long)1L))).addElements((Object)KV.of((Object)"hello", (Object)37), (Object[])new KV[0]).advanceWatermarkToInfinity();
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)stream)).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new KV[]{KV.of((Object)3, (Object)BoundedWindow.TIMESTAMP_MAX_VALUE.minus((ReadableDuration)Duration.standardDays((long)1L))), KV.of((Object)42, (Object)BoundedWindow.TIMESTAMP_MAX_VALUE.minus((ReadableDuration)Duration.standardDays((long)1L)))});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
        public void testProcessingTimeTimerCanBeReset() throws Exception {
            String timerId = "foo";
            DoFn<KV<String, String>, String> fn = new DoFn<KV<String, String>, String>(){
                @DoFn.TimerId(value="foo")
                private final TimerSpec spec = TimerSpecs.timer((TimeDomain)TimeDomain.PROCESSING_TIME);

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext context, @DoFn.TimerId(value="foo") Timer timer) {
                    timer.offset(Duration.standardSeconds((long)1L)).setRelative();
                    context.output((Object)((String)((KV)context.element()).getValue()));
                }

                @DoFn.OnTimer(value="foo")
                public void onTimer(DoFn.OutputReceiver<String> r) {
                    r.output((Object)"timer_output");
                }
            };
            TestStream stream = TestStream.create((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of())).addElements((Object)KV.of((Object)"key", (Object)"input1"), (Object[])new KV[0]).addElements((Object)KV.of((Object)"key", (Object)"input2"), (Object[])new KV[0]).advanceProcessingTime(Duration.standardSeconds((long)2L)).advanceWatermarkToInfinity();
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)stream)).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new String[]{"input1", "input2", "timer_output"});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
        public void testEventTimeTimerCanBeReset() throws Exception {
            String timerId = "foo";
            DoFn<KV<String, String>, String> fn = new DoFn<KV<String, String>, String>(){
                @DoFn.TimerId(value="foo")
                private final TimerSpec spec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext context, @DoFn.TimerId(value="foo") Timer timer) {
                    timer.offset(Duration.standardSeconds((long)1L)).setRelative();
                    context.output((Object)((String)((KV)context.element()).getValue()));
                }

                @DoFn.OnTimer(value="foo")
                public void onTimer(DoFn.OutputReceiver<String> r) {
                    r.output((Object)"timer_output");
                }
            };
            TestStream stream = TestStream.create((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of())).advanceWatermarkTo(new Instant(0L)).addElements((Object)KV.of((Object)"hello", (Object)"input1"), (Object[])new KV[0]).addElements((Object)KV.of((Object)"hello", (Object)"input2"), (Object[])new KV[0]).advanceWatermarkToInfinity();
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)stream)).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new String[]{"input1", "input2", "timer_output"});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesTimersInParDo.class})
        public void testPipelineOptionsParameterOnTimer() {
            String timerId = "thisTimer";
            PCollection results = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)0, (Object)0), (Object[])new KV[0]))).apply((PTransform)ParDo.of((DoFn)new DoFn<KV<Integer, Integer>, String>(){
                @DoFn.TimerId(value="thisTimer")
                private final TimerSpec spec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

                @DoFn.ProcessElement
                public void process(DoFn.ProcessContext c, BoundedWindow w, @DoFn.TimerId(value="thisTimer") Timer timer) {
                    timer.set(w.maxTimestamp());
                }

                @DoFn.OnTimer(value="thisTimer")
                public void onTimer(DoFn.OutputReceiver<String> r, PipelineOptions options) {
                    r.output((Object)((MyOptions)options.as(MyOptions.class)).getFakeOption());
                }
            }));
            String testOptionValue = "not fake anymore";
            ((MyOptions)this.pipeline.getOptions().as(MyOptions.class)).setFakeOption(testOptionValue);
            PAssert.that((PCollection)results).containsInAnyOrder((Object[])new String[]{"not fake anymore"});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesTestStream.class})
        public void duplicateTimerSetting() {
            TestStream stream = TestStream.create((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of())).addElements((Object)KV.of((Object)"key1", (Object)"v1"), (Object[])new KV[0]).advanceWatermarkToInfinity();
            PCollection result = (PCollection)((PCollection)this.pipeline.apply((PTransform)stream)).apply((PTransform)ParDo.of((DoFn)new TwoTimerDoFn()));
            PAssert.that((PCollection)result).containsInAnyOrder((Object[])new String[]{"It works"});
            this.pipeline.run().waitUntilFinish();
        }
    }

    @RunWith(value=JUnit4.class)
    public static class StateCoderInferenceTests
    extends SharedTestBase
    implements Serializable {
        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class})
        public void testBagStateCoderInference() {
            String stateId = "foo";
            MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
            this.pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, (Coder)myIntegerCoder);
            DoFn<KV<String, Integer>, List<MyInteger>> fn = new DoFn<KV<String, Integer>, List<MyInteger>>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<BagState<MyInteger>> bufferState = StateSpecs.bag();

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element KV<String, Integer> element, @DoFn.StateId(value="foo") BagState<MyInteger> state, DoFn.OutputReceiver<List<MyInteger>> r) {
                    state.add((Object)new MyInteger((Integer)element.getValue()));
                    Iterable currentValue = state.read();
                    if (Iterables.size(currentValue) >= 4) {
                        ArrayList sorted = Lists.newArrayList(currentValue);
                        Collections.sort(sorted);
                        r.output(sorted);
                    }
                }
            };
            PCollection output = ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)97), (Object[])new KV[]{KV.of((Object)"hello", (Object)42), KV.of((Object)"hello", (Object)84), KV.of((Object)"hello", (Object)12)}))).apply((PTransform)ParDo.of((DoFn)fn))).setCoder((Coder)ListCoder.of((Coder)myIntegerCoder));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new List[]{Lists.newArrayList(new MyInteger(12), new MyInteger(42), new MyInteger(84), new MyInteger(97))});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class})
        public void testBagStateCoderInferenceFailure() throws Exception {
            String stateId = "foo";
            MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
            DoFn<KV<String, Integer>, List<MyInteger>> fn = new DoFn<KV<String, Integer>, List<MyInteger>>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<BagState<MyInteger>> bufferState = StateSpecs.bag();

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element KV<String, Integer> element, @DoFn.StateId(value="foo") BagState<MyInteger> state, DoFn.OutputReceiver<List<MyInteger>> r) {
                    state.add((Object)new MyInteger((Integer)element.getValue()));
                    Iterable currentValue = state.read();
                    if (Iterables.size(currentValue) >= 4) {
                        ArrayList sorted = Lists.newArrayList(currentValue);
                        Collections.sort(sorted);
                        r.output(sorted);
                    }
                }
            };
            this.thrown.expect(RuntimeException.class);
            this.thrown.expectMessage("Unable to infer a coder for BagState and no Coder was specified.");
            ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)97), (Object[])new KV[]{KV.of((Object)"hello", (Object)42), KV.of((Object)"hello", (Object)84), KV.of((Object)"hello", (Object)12)}))).apply((PTransform)ParDo.of((DoFn)fn))).setCoder((Coder)ListCoder.of((Coder)myIntegerCoder));
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class, UsesSetState.class})
        public void testSetStateCoderInference() {
            String stateId = "foo";
            String countStateId = "count";
            MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
            this.pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, (Coder)myIntegerCoder);
            DoFn<KV<String, Integer>, Set<MyInteger>> fn = new DoFn<KV<String, Integer>, Set<MyInteger>>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<SetState<MyInteger>> setState = StateSpecs.set();
                @DoFn.StateId(value="count")
                private final StateSpec<CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningFromInputInternal((Coder)VarIntCoder.of(), (Combine.CombineFn)Sum.ofIntegers());

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element KV<String, Integer> element, @DoFn.StateId(value="foo") SetState<MyInteger> state, @DoFn.StateId(value="count") CombiningState<Integer, int[], Integer> count, DoFn.OutputReceiver<Set<MyInteger>> r) {
                    state.add((Object)new MyInteger((Integer)element.getValue()));
                    count.add((Object)1);
                    if ((Integer)count.read() >= 4) {
                        HashSet set = Sets.newHashSet((Iterable)state.read());
                        r.output(set);
                    }
                }
            };
            PCollection output = ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)97), (Object[])new KV[]{KV.of((Object)"hello", (Object)42), KV.of((Object)"hello", (Object)42), KV.of((Object)"hello", (Object)12)}))).apply((PTransform)ParDo.of((DoFn)fn))).setCoder((Coder)SetCoder.of((Coder)myIntegerCoder));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Set[]{Sets.newHashSet(new MyInteger(97), new MyInteger(42), new MyInteger(12))});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class, UsesSetState.class})
        public void testSetStateCoderInferenceFailure() throws Exception {
            String stateId = "foo";
            String countStateId = "count";
            MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
            DoFn<KV<String, Integer>, Set<MyInteger>> fn = new DoFn<KV<String, Integer>, Set<MyInteger>>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<SetState<MyInteger>> setState = StateSpecs.set();
                @DoFn.StateId(value="count")
                private final StateSpec<CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningFromInputInternal((Coder)VarIntCoder.of(), (Combine.CombineFn)Sum.ofIntegers());

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element KV<String, Integer> element, @DoFn.StateId(value="foo") SetState<MyInteger> state, @DoFn.StateId(value="count") CombiningState<Integer, int[], Integer> count, DoFn.OutputReceiver<Set<MyInteger>> r) {
                    state.add((Object)new MyInteger((Integer)element.getValue()));
                    count.add((Object)1);
                    if ((Integer)count.read() >= 4) {
                        HashSet set = Sets.newHashSet((Iterable)state.read());
                        r.output(set);
                    }
                }
            };
            this.thrown.expect(RuntimeException.class);
            this.thrown.expectMessage("Unable to infer a coder for SetState and no Coder was specified.");
            ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)97), (Object[])new KV[]{KV.of((Object)"hello", (Object)42), KV.of((Object)"hello", (Object)42), KV.of((Object)"hello", (Object)12)}))).apply((PTransform)ParDo.of((DoFn)fn))).setCoder((Coder)SetCoder.of((Coder)myIntegerCoder));
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class, UsesMapState.class})
        public void testMapStateCoderInference() {
            String stateId = "foo";
            String countStateId = "count";
            MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
            this.pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, (Coder)myIntegerCoder);
            DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>> fn = new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map();
                @DoFn.StateId(value="count")
                private final StateSpec<CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningFromInputInternal((Coder)VarIntCoder.of(), (Combine.CombineFn)Sum.ofIntegers());

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element KV<String, KV<String, Integer>> element, @DoFn.StateId(value="foo") MapState<String, MyInteger> state, @DoFn.StateId(value="count") CombiningState<Integer, int[], Integer> count, DoFn.OutputReceiver<KV<String, MyInteger>> r) {
                    KV value = (KV)element.getValue();
                    state.put((Object)((String)value.getKey()), (Object)new MyInteger((Integer)value.getValue()));
                    count.add((Object)1);
                    if ((Integer)count.read() >= 4) {
                        Iterable iterate = (Iterable)state.entries().read();
                        for (Map.Entry entry : iterate) {
                            r.output((Object)KV.of((Object)((String)entry.getKey()), (Object)((MyInteger)entry.getValue())));
                        }
                    }
                }
            };
            PCollection output = ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)KV.of((Object)"a", (Object)97)), (Object[])new KV[]{KV.of((Object)"hello", (Object)KV.of((Object)"b", (Object)42)), KV.of((Object)"hello", (Object)KV.of((Object)"b", (Object)42)), KV.of((Object)"hello", (Object)KV.of((Object)"c", (Object)12))}))).apply((PTransform)ParDo.of((DoFn)fn))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)myIntegerCoder));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new KV[]{KV.of((Object)"a", (Object)new MyInteger(97)), KV.of((Object)"b", (Object)new MyInteger(42)), KV.of((Object)"c", (Object)new MyInteger(12))});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class, UsesMapState.class})
        public void testMapStateCoderInferenceFailure() throws Exception {
            String stateId = "foo";
            String countStateId = "count";
            MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
            DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>> fn = new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map();
                @DoFn.StateId(value="count")
                private final StateSpec<CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningFromInputInternal((Coder)VarIntCoder.of(), (Combine.CombineFn)Sum.ofIntegers());

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c, @DoFn.Element KV<String, KV<String, Integer>> element, @DoFn.StateId(value="foo") MapState<String, MyInteger> state, @DoFn.StateId(value="count") CombiningState<Integer, int[], Integer> count, DoFn.OutputReceiver<KV<String, MyInteger>> r) {
                    KV value = (KV)element.getValue();
                    state.put((Object)((String)value.getKey()), (Object)new MyInteger((Integer)value.getValue()));
                    count.add((Object)1);
                    if ((Integer)count.read() >= 4) {
                        Iterable iterate = (Iterable)state.entries().read();
                        for (Map.Entry entry : iterate) {
                            r.output((Object)KV.of((Object)((String)entry.getKey()), (Object)((MyInteger)entry.getValue())));
                        }
                    }
                }
            };
            this.thrown.expect(RuntimeException.class);
            this.thrown.expectMessage("Unable to infer a coder for MapState and no Coder was specified.");
            ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)KV.of((Object)"a", (Object)97)), (Object[])new KV[]{KV.of((Object)"hello", (Object)KV.of((Object)"b", (Object)42)), KV.of((Object)"hello", (Object)KV.of((Object)"b", (Object)42)), KV.of((Object)"hello", (Object)KV.of((Object)"c", (Object)12))}))).apply((PTransform)ParDo.of((DoFn)fn))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)myIntegerCoder));
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class})
        public void testCombiningStateCoderInference() {
            this.pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, (Coder)MyIntegerCoder.of());
            String stateId = "foo";
            DoFn<KV<String, Integer>, String> fn = new DoFn<KV<String, Integer>, String>(){
                private static final int EXPECTED_SUM = 16;
                @DoFn.StateId(value="foo")
                private final StateSpec<CombiningState<Integer, MyInteger, Integer>> combiningState = StateSpecs.combining((Combine.CombineFn)new Combine.CombineFn<Integer, MyInteger, Integer>(){

                    public MyInteger createAccumulator() {
                        return new MyInteger(0);
                    }

                    public MyInteger addInput(MyInteger accumulator, Integer input) {
                        return new MyInteger(accumulator.getValue() + input);
                    }

                    public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) {
                        int newValue = 0;
                        for (MyInteger myInteger : accumulators) {
                            newValue += myInteger.getValue();
                        }
                        return new MyInteger(newValue);
                    }

                    public Integer extractOutput(MyInteger accumulator) {
                        return accumulator.getValue();
                    }
                });

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element KV<String, Integer> element, @DoFn.StateId(value="foo") CombiningState<Integer, MyInteger, Integer> state, DoFn.OutputReceiver<String> r) {
                    state.add((Object)((Integer)element.getValue()));
                    Integer currentValue = (Integer)state.read();
                    if (currentValue == 16) {
                        r.output((Object)"right on");
                    }
                }
            };
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)3), (Object[])new KV[]{KV.of((Object)"hello", (Object)6), KV.of((Object)"hello", (Object)7)}))).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new String[]{"right on"});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class})
        public void testCombiningStateCoderInferenceFailure() throws Exception {
            String stateId = "foo";
            DoFn<KV<String, Integer>, String> fn = new DoFn<KV<String, Integer>, String>(){
                private static final int EXPECTED_SUM = 16;
                @DoFn.StateId(value="foo")
                private final StateSpec<CombiningState<Integer, MyInteger, Integer>> combiningState = StateSpecs.combining((Combine.CombineFn)new Combine.CombineFn<Integer, MyInteger, Integer>(){

                    public MyInteger createAccumulator() {
                        return new MyInteger(0);
                    }

                    public MyInteger addInput(MyInteger accumulator, Integer input) {
                        return new MyInteger(accumulator.getValue() + input);
                    }

                    public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) {
                        int newValue = 0;
                        for (MyInteger myInteger : accumulators) {
                            newValue += myInteger.getValue();
                        }
                        return new MyInteger(newValue);
                    }

                    public Integer extractOutput(MyInteger accumulator) {
                        return accumulator.getValue();
                    }
                });

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element KV<String, Integer> element, @DoFn.StateId(value="foo") CombiningState<Integer, MyInteger, Integer> state, DoFn.OutputReceiver<String> r) {
                    state.add((Object)((Integer)element.getValue()));
                    Integer currentValue = (Integer)state.read();
                    if (currentValue == 16) {
                        r.output((Object)"right on");
                    }
                }
            };
            this.thrown.expect(RuntimeException.class);
            this.thrown.expectMessage("Unable to infer a coder for CombiningState and no Coder was specified.");
            ((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)3), (Object[])new KV[]{KV.of((Object)"hello", (Object)6), KV.of((Object)"hello", (Object)7)}))).apply((PTransform)ParDo.of((DoFn)fn));
            this.pipeline.run();
        }
    }

    @RunWith(value=JUnit4.class)
    public static class StateTests
    extends SharedTestBase
    implements Serializable {
        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class})
        public void testValueStateSimple() {
            String stateId = "foo";
            DoFn<KV<String, Integer>, Integer> fn = new DoFn<KV<String, Integer>, Integer>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<ValueState<Integer>> intState = StateSpecs.value((Coder)VarIntCoder.of());

                @DoFn.ProcessElement
                public void processElement(@DoFn.StateId(value="foo") ValueState<Integer> state, DoFn.OutputReceiver<Integer> r) {
                    Integer currentValue = MoreObjects.firstNonNull((Integer)state.read(), 0);
                    r.output((Object)currentValue);
                    state.write((Object)(currentValue + 1));
                }
            };
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)42), (Object[])new KV[]{KV.of((Object)"hello", (Object)97), KV.of((Object)"hello", (Object)84)}))).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Integer[]{0, 1, 2});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class})
        public void testValueStateDedup() {
            String stateId = "foo";
            DoFn<KV<Integer, Integer>, Integer> onePerKey = new DoFn<KV<Integer, Integer>, Integer>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<ValueState<Integer>> seenSpec = StateSpecs.value((Coder)VarIntCoder.of());

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element KV<Integer, Integer> element, @DoFn.StateId(value="foo") ValueState<Integer> seenState, DoFn.OutputReceiver<Integer> r) {
                    Integer seen = MoreObjects.firstNonNull((Integer)seenState.read(), 0);
                    if (seen == 0) {
                        seenState.write((Object)(seen + 1));
                        r.output((Object)((Integer)element.getValue()));
                    }
                }
            };
            int numKeys = 50;
            ArrayList<KV> input = new ArrayList<KV>();
            HashSet<Integer> expectedOutput = new HashSet<Integer>();
            for (int key = 0; key < numKeys; ++key) {
                int output = 1000 + key;
                expectedOutput.add(output);
                for (int i = 0; i < 15; ++i) {
                    input.add(KV.of((Object)key, (Object)output));
                }
            }
            Collections.shuffle(input);
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(input))).apply((PTransform)ParDo.of((DoFn)onePerKey));
            PAssert.that((PCollection)output).containsInAnyOrder(expectedOutput);
            this.pipeline.run();
        }

        @Test
        public void testStateNotKeyed() {
            String stateId = "foo";
            DoFn<String, Integer> fn = new DoFn<String, Integer>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<ValueState<Integer>> intState = StateSpecs.value();

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c, @DoFn.StateId(value="foo") ValueState<Integer> state) {
                }
            };
            this.thrown.expect(IllegalArgumentException.class);
            this.thrown.expectMessage("state");
            this.thrown.expectMessage("KvCoder");
            ((PCollection)this.pipeline.apply((PTransform)Create.of((Object)"hello", (Object[])new String[]{"goodbye", "hello again"}))).apply((PTransform)ParDo.of((DoFn)fn));
        }

        @Test
        public void testStateNotDeterministic() {
            String stateId = "foo";
            DoFn<KV<Double, String>, Integer> fn = new DoFn<KV<Double, String>, Integer>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<ValueState<Integer>> intState = StateSpecs.value();

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c, @DoFn.StateId(value="foo") ValueState<Integer> state) {
                }
            };
            this.thrown.expect(IllegalArgumentException.class);
            this.thrown.expectMessage("state");
            this.thrown.expectMessage("deterministic");
            ((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)1.0, (Object)"hello"), (Object[])new KV[]{KV.of((Object)5.4, (Object)"goodbye"), KV.of((Object)7.2, (Object)"hello again")}))).apply((PTransform)ParDo.of((DoFn)fn));
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class})
        public void testCoderInferenceOfList() {
            String stateId = "foo";
            MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
            this.pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, (Coder)myIntegerCoder);
            DoFn<KV<String, Integer>, List<MyInteger>> fn = new DoFn<KV<String, Integer>, List<MyInteger>>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<ValueState<List<MyInteger>>> intState = StateSpecs.value();

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element KV<String, Integer> element, @DoFn.StateId(value="foo") ValueState<List<MyInteger>> state, DoFn.OutputReceiver<List<MyInteger>> r) {
                    MyInteger myInteger = new MyInteger((Integer)element.getValue());
                    List currentValue = (List)state.read();
                    List<MyInteger> newValue = currentValue != null ? ((ImmutableList.Builder)((ImmutableList.Builder)ImmutableList.builder().addAll((Iterable)currentValue)).add(myInteger)).build() : Collections.singletonList(myInteger);
                    r.output(newValue);
                    state.write(newValue);
                }
            };
            ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)42), (Object[])new KV[]{KV.of((Object)"hello", (Object)97), KV.of((Object)"hello", (Object)84)}))).apply((PTransform)ParDo.of((DoFn)fn))).setCoder((Coder)ListCoder.of((Coder)myIntegerCoder));
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class})
        public void testValueStateFixedWindows() {
            String stateId = "foo";
            DoFn<KV<String, Integer>, Integer> fn = new DoFn<KV<String, Integer>, Integer>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<ValueState<Integer>> intState = StateSpecs.value((Coder)VarIntCoder.of());

                @DoFn.ProcessElement
                public void processElement(@DoFn.StateId(value="foo") ValueState<Integer> state, DoFn.OutputReceiver<Integer> r) {
                    Integer currentValue = MoreObjects.firstNonNull((Integer)state.read(), 0);
                    r.output((Object)currentValue);
                    state.write((Object)(currentValue + 1));
                }
            };
            IntervalWindow firstWindow = new IntervalWindow(new Instant(0L), new Instant(10L));
            IntervalWindow secondWindow = new IntervalWindow(new Instant(10L), new Instant(20L));
            PCollection output = (PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.timestamped((TimestampedValue)TimestampedValue.of((Object)KV.of((Object)"hello", (Object)7), (Instant)new Instant(1L)), (TimestampedValue[])new TimestampedValue[]{TimestampedValue.of((Object)KV.of((Object)"hello", (Object)14), (Instant)new Instant(2L)), TimestampedValue.of((Object)KV.of((Object)"hello", (Object)21), (Instant)new Instant(3L)), TimestampedValue.of((Object)KV.of((Object)"hello", (Object)28), (Instant)new Instant(11L)), TimestampedValue.of((Object)KV.of((Object)"hello", (Object)35), (Instant)new Instant(13L))}))).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.millis((long)10L))))).apply("Stateful ParDo", (PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).inWindow((BoundedWindow)firstWindow).containsInAnyOrder((Object[])new Integer[]{0, 1, 2});
            PAssert.that((PCollection)output).inWindow((BoundedWindow)secondWindow).containsInAnyOrder((Object[])new Integer[]{0, 1});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class})
        public void testValueStateSameId() {
            String stateId = "foo";
            DoFn<KV<String, Integer>, KV<String, Integer>> fn = new DoFn<KV<String, Integer>, KV<String, Integer>>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<ValueState<Integer>> intState = StateSpecs.value((Coder)VarIntCoder.of());

                @DoFn.ProcessElement
                public void processElement(@DoFn.StateId(value="foo") ValueState<Integer> state, DoFn.OutputReceiver<KV<String, Integer>> r) {
                    Integer currentValue = MoreObjects.firstNonNull((Integer)state.read(), 0);
                    r.output((Object)KV.of((Object)"sizzle", (Object)currentValue));
                    state.write((Object)(currentValue + 1));
                }
            };
            DoFn<KV<String, Integer>, Integer> fn2 = new DoFn<KV<String, Integer>, Integer>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<ValueState<Integer>> intState = StateSpecs.value((Coder)VarIntCoder.of());

                @DoFn.ProcessElement
                public void processElement(@DoFn.StateId(value="foo") ValueState<Integer> state, DoFn.OutputReceiver<Integer> r) {
                    Integer currentValue = MoreObjects.firstNonNull((Integer)state.read(), 13);
                    r.output((Object)currentValue);
                    state.write((Object)(currentValue + 13));
                }
            };
            PCollection intermediate = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)42), (Object[])new KV[]{KV.of((Object)"hello", (Object)97), KV.of((Object)"hello", (Object)84)}))).apply("First stateful ParDo", (PTransform)ParDo.of((DoFn)fn));
            PCollection output = (PCollection)intermediate.apply("Second stateful ParDo", (PTransform)ParDo.of((DoFn)fn2));
            PAssert.that((PCollection)intermediate).containsInAnyOrder((Object[])new KV[]{KV.of((Object)"sizzle", (Object)0), KV.of((Object)"sizzle", (Object)1), KV.of((Object)"sizzle", (Object)2)});
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Integer[]{13, 26, 39});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class})
        public void testValueStateTaggedOutput() {
            String stateId = "foo";
            TupleTag<Integer> evenTag = new TupleTag<Integer>(){};
            TupleTag<Integer> oddTag = new TupleTag<Integer>(){};
            DoFn<KV<String, Integer>, Integer> fn = new DoFn<KV<String, Integer>, Integer>((TupleTag)evenTag, (TupleTag)oddTag){
                @DoFn.StateId(value="foo")
                private final StateSpec<ValueState<Integer>> intState = StateSpecs.value((Coder)VarIntCoder.of());
                final /* synthetic */ TupleTag val$evenTag;
                final /* synthetic */ TupleTag val$oddTag;
                {
                    this.val$evenTag = tupleTag;
                    this.val$oddTag = tupleTag2;
                }

                @DoFn.ProcessElement
                public void processElement(@DoFn.StateId(value="foo") ValueState<Integer> state, DoFn.MultiOutputReceiver r) {
                    Integer currentValue = MoreObjects.firstNonNull((Integer)state.read(), 0);
                    if (currentValue % 2 == 0) {
                        r.get(this.val$evenTag).output((Object)currentValue);
                    } else {
                        r.get(this.val$oddTag).output((Object)currentValue);
                    }
                    state.write((Object)(currentValue + 1));
                }
            };
            PCollectionTuple output = (PCollectionTuple)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)42), (Object[])new KV[]{KV.of((Object)"hello", (Object)97), KV.of((Object)"hello", (Object)84), KV.of((Object)"goodbye", (Object)33), KV.of((Object)"hello", (Object)859), KV.of((Object)"goodbye", (Object)83945)}))).apply((PTransform)ParDo.of((DoFn)fn).withOutputTags((TupleTag)evenTag, TupleTagList.of((TupleTag)oddTag)));
            PCollection evens = output.get((TupleTag)evenTag);
            PCollection odds = output.get((TupleTag)oddTag);
            PAssert.that((PCollection)evens).containsInAnyOrder((Object[])new Integer[]{0, 2, 0});
            PAssert.that((PCollection)odds).containsInAnyOrder((Object[])new Integer[]{1, 3, 1});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class})
        public void testBagState() {
            String stateId = "foo";
            DoFn<KV<String, Integer>, List<Integer>> fn = new DoFn<KV<String, Integer>, List<Integer>>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<BagState<Integer>> bufferState = StateSpecs.bag((Coder)VarIntCoder.of());

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element KV<String, Integer> element, @DoFn.StateId(value="foo") BagState<Integer> state, DoFn.OutputReceiver<List<Integer>> r) {
                    ReadableState isEmpty = state.isEmpty();
                    state.add((Object)((Integer)element.getValue()));
                    Assert.assertFalse((boolean)((Boolean)isEmpty.read()));
                    Iterable currentValue = state.read();
                    if (Iterables.size(currentValue) >= 4) {
                        state.add((Object)-1);
                        Assert.assertEquals((long)4L, (long)Iterables.size(currentValue));
                        Assert.assertEquals((long)5L, (long)Iterables.size(state.read()));
                        ArrayList sorted = Lists.newArrayList(currentValue);
                        Collections.sort(sorted);
                        r.output(sorted);
                    }
                }
            };
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)97), (Object[])new KV[]{KV.of((Object)"hello", (Object)42), KV.of((Object)"hello", (Object)84), KV.of((Object)"hello", (Object)12)}))).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new List[]{Lists.newArrayList(12, 42, 84, 97)});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class, UsesSetState.class})
        public void testSetState() {
            String stateId = "foo";
            String countStateId = "count";
            DoFn<KV<String, Integer>, Set<Integer>> fn = new DoFn<KV<String, Integer>, Set<Integer>>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<SetState<Integer>> setState = StateSpecs.set((Coder)VarIntCoder.of());
                @DoFn.StateId(value="count")
                private final StateSpec<CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningFromInputInternal((Coder)VarIntCoder.of(), (Combine.CombineFn)Sum.ofIntegers());

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element KV<String, Integer> element, @DoFn.StateId(value="foo") SetState<Integer> state, @DoFn.StateId(value="count") CombiningState<Integer, int[], Integer> count, DoFn.OutputReceiver<Set<Integer>> r) {
                    ReadableState isEmpty = state.isEmpty();
                    state.add((Object)((Integer)element.getValue()));
                    Assert.assertFalse((boolean)((Boolean)isEmpty.read()));
                    count.add((Object)1);
                    if ((Integer)count.read() >= 4) {
                        Iterable ints = (Iterable)state.read();
                        state.add((Object)-1);
                        Assert.assertEquals((long)3L, (long)Iterables.size(ints));
                        Assert.assertEquals((long)4L, (long)Iterables.size((Iterable)state.read()));
                        HashSet set = Sets.newHashSet(ints);
                        r.output(set);
                    }
                }
            };
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)97), (Object[])new KV[]{KV.of((Object)"hello", (Object)42), KV.of((Object)"hello", (Object)42), KV.of((Object)"hello", (Object)12)}))).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Set[]{Sets.newHashSet(97, 42, 12)});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class, UsesMapState.class})
        public void testMapState() {
            String stateId = "foo";
            String countStateId = "count";
            DoFn<KV<String, KV<String, Integer>>, KV<String, Integer>> fn = new DoFn<KV<String, KV<String, Integer>>, KV<String, Integer>>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<MapState<String, Integer>> mapState = StateSpecs.map((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of());
                @DoFn.StateId(value="count")
                private final StateSpec<CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningFromInputInternal((Coder)VarIntCoder.of(), (Combine.CombineFn)Sum.ofIntegers());

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c, @DoFn.Element KV<String, KV<String, Integer>> element, @DoFn.StateId(value="foo") MapState<String, Integer> state, @DoFn.StateId(value="count") CombiningState<Integer, int[], Integer> count, DoFn.OutputReceiver<KV<String, Integer>> r) {
                    KV value = (KV)element.getValue();
                    ReadableState entriesView = state.entries();
                    state.put((Object)((String)value.getKey()), (Object)((Integer)value.getValue()));
                    count.add((Object)1);
                    if ((Integer)count.read() >= 4) {
                        Iterable iterate = (Iterable)state.entries().read();
                        state.put((Object)"BadKey", (Object)-1);
                        Assert.assertEquals((long)3L, (long)Iterables.size(iterate));
                        Assert.assertEquals((long)4L, (long)Iterables.size((Iterable)entriesView.read()));
                        Assert.assertEquals((long)4L, (long)Iterables.size((Iterable)state.entries().read()));
                        for (Map.Entry entry : iterate) {
                            r.output((Object)KV.of((Object)((String)entry.getKey()), (Object)((Integer)entry.getValue())));
                        }
                    }
                }
            };
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)KV.of((Object)"a", (Object)97)), (Object[])new KV[]{KV.of((Object)"hello", (Object)KV.of((Object)"b", (Object)42)), KV.of((Object)"hello", (Object)KV.of((Object)"b", (Object)42)), KV.of((Object)"hello", (Object)KV.of((Object)"c", (Object)12))}))).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new KV[]{KV.of((Object)"a", (Object)97), KV.of((Object)"b", (Object)42), KV.of((Object)"c", (Object)12)});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class})
        public void testCombiningState() {
            String stateId = "foo";
            DoFn<KV<String, Double>, String> fn = new DoFn<KV<String, Double>, String>(){
                private static final double EPSILON = 1.0E-4;
                @DoFn.StateId(value="foo")
                private final StateSpec<CombiningState<Double, Mean.CountSum<Double>, Double>> combiningState = StateSpecs.combining((Coder)new Mean.CountSumCoder(), (Combine.CombineFn)Mean.of());

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c, @DoFn.Element KV<String, Double> element, @DoFn.StateId(value="foo") CombiningState<Double, Mean.CountSum<Double>, Double> state, DoFn.OutputReceiver<String> r) {
                    state.add((Object)((Double)element.getValue()));
                    Double currentValue = (Double)state.read();
                    if (Math.abs(currentValue - 0.5) < 1.0E-4) {
                        r.output((Object)"right on");
                    }
                }
            };
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)0.3), (Object[])new KV[]{KV.of((Object)"hello", (Object)0.6), KV.of((Object)"hello", (Object)0.6)}))).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new String[]{"right on"});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class})
        public void testCombiningStateParameterSuperclass() {
            String stateId = "foo";
            DoFn<KV<Integer, Integer>, String> fn = new DoFn<KV<Integer, Integer>, String>(){
                private static final int EXPECTED_SUM = 8;
                @DoFn.StateId(value="foo")
                private final StateSpec<CombiningState<Integer, int[], Integer>> state = StateSpecs.combining((Combine.CombineFn)Sum.ofIntegers());

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element KV<Integer, Integer> element, @DoFn.StateId(value="foo") GroupingState<Integer, Integer> state, DoFn.OutputReceiver<String> r) {
                    state.add((Object)((Integer)element.getValue()));
                    Integer currentValue = (Integer)state.read();
                    if (currentValue == 8) {
                        r.output((Object)"right on");
                    }
                }
            };
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)123, (Object)4), (Object[])new KV[]{KV.of((Object)123, (Object)7), KV.of((Object)123, (Object)-3)}))).apply((PTransform)ParDo.of((DoFn)fn));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new String[]{"right on"});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class, UsesStatefulParDo.class})
        public void testBagStateSideInput() {
            final PCollectionView listView = (PCollectionView)((PCollection)this.pipeline.apply("Create list for side input", (PTransform)Create.of((Object)2, (Object[])new Integer[]{1, 0}))).apply((PTransform)View.asList());
            String stateId = "foo";
            DoFn<KV<String, Integer>, List<Integer>> fn = new DoFn<KV<String, Integer>, List<Integer>>(){
                @DoFn.StateId(value="foo")
                private final StateSpec<BagState<Integer>> bufferState = StateSpecs.bag((Coder)VarIntCoder.of());

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c, @DoFn.Element KV<String, Integer> element, @DoFn.StateId(value="foo") BagState<Integer> state, DoFn.OutputReceiver<List<Integer>> r) {
                    state.add((Object)((Integer)element.getValue()));
                    Iterable currentValue = state.read();
                    if (Iterables.size(currentValue) >= 4) {
                        ArrayList sorted = Lists.newArrayList(currentValue);
                        Collections.sort(sorted);
                        r.output(sorted);
                        ArrayList sideSorted = Lists.newArrayList((Iterable)c.sideInput(listView));
                        Collections.sort(sideSorted);
                        r.output(sideSorted);
                    }
                }
            };
            PCollection output = (PCollection)((PCollection)this.pipeline.apply("Create main input", (PTransform)Create.of((Object)KV.of((Object)"hello", (Object)97), (Object[])new KV[]{KV.of((Object)"hello", (Object)42), KV.of((Object)"hello", (Object)84), KV.of((Object)"hello", (Object)12)}))).apply((PTransform)ParDo.of((DoFn)fn).withSideInputs(new PCollectionView[]{listView}));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new List[]{Lists.newArrayList(12, 42, 84, 97), Lists.newArrayList(0, 1, 2)});
            this.pipeline.run();
        }
    }

    @RunWith(value=JUnit4.class)
    public static class TimestampTests
    extends SharedTestBase
    implements Serializable {
        @Test
        @Category(value={NeedsRunner.class})
        public void testParDoOutputWithTimestamp() {
            PCollection input = (PCollection)this.pipeline.apply((PTransform)Create.of(Arrays.asList(3, 42, 6)));
            PCollection output = (PCollection)((PCollection)((PCollection)input.apply((PTransform)ParDo.of(new TestOutputTimestampDoFn()))).apply((PTransform)ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO, Duration.ZERO)))).apply((PTransform)ParDo.of(new TestFormatTimestampDoFn()));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new String[]{"processing: 3, timestamp: 3", "processing: 42, timestamp: 42", "processing: 6, timestamp: 6"});
            this.pipeline.run();
        }

        @Test
        @Category(value={NeedsRunner.class})
        public void testParDoTaggedOutputWithTimestamp() {
            PCollection input = (PCollection)this.pipeline.apply((PTransform)Create.of(Arrays.asList(3, 42, 6)));
            TupleTag<Integer> mainOutputTag = new TupleTag<Integer>("main"){};
            TupleTag<Integer> additionalOutputTag = new TupleTag<Integer>("additional"){};
            PCollection output = (PCollection)((PCollection)((PCollectionTuple)input.apply((PTransform)ParDo.of((DoFn)new DoFn<Integer, Integer>((TupleTag)additionalOutputTag){
                final /* synthetic */ TupleTag val$additionalOutputTag;
                {
                    this.val$additionalOutputTag = tupleTag;
                }

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element Integer element, DoFn.MultiOutputReceiver r) {
                    r.get(this.val$additionalOutputTag).outputWithTimestamp((Object)element, new Instant(element.longValue()));
                }
            }).withOutputTags((TupleTag)mainOutputTag, TupleTagList.of((TupleTag)additionalOutputTag)))).get((TupleTag)additionalOutputTag).apply((PTransform)ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO, Duration.ZERO)))).apply((PTransform)ParDo.of(new TestFormatTimestampDoFn()));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new String[]{"processing: 3, timestamp: 3", "processing: 42, timestamp: 42", "processing: 6, timestamp: 6"});
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testParDoShiftTimestamp() {
            PCollection input = (PCollection)this.pipeline.apply((PTransform)Create.of(Arrays.asList(3, 42, 6)));
            PCollection output = (PCollection)((PCollection)((PCollection)input.apply((PTransform)ParDo.of(new TestOutputTimestampDoFn()))).apply((PTransform)ParDo.of(new TestShiftTimestampDoFn(Duration.millis((long)1000L), Duration.millis((long)-1000L))))).apply((PTransform)ParDo.of(new TestFormatTimestampDoFn()));
            PAssert.that((PCollection)output).containsInAnyOrder((Object[])new String[]{"processing: 3, timestamp: -997", "processing: 42, timestamp: -958", "processing: 6, timestamp: -994"});
            this.pipeline.run();
        }

        @Test
        @Category(value={NeedsRunner.class})
        public void testParDoShiftTimestampInvalid() {
            ((PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(Arrays.asList(3, 42, 6)))).apply((PTransform)ParDo.of(new TestOutputTimestampDoFn()))).apply((PTransform)ParDo.of(new TestShiftTimestampDoFn(Duration.millis((long)1000L), Duration.millis((long)-1001L))))).apply((PTransform)ParDo.of(new TestFormatTimestampDoFn()));
            this.thrown.expect(RuntimeException.class);
            this.thrown.expectMessage("Cannot output with timestamp");
            this.thrown.expectMessage("Output timestamps must be no earlier than the timestamp of the current input");
            this.thrown.expectMessage("minus the allowed skew (1 second).");
            this.pipeline.run();
        }

        @Test
        @Category(value={NeedsRunner.class})
        public void testParDoShiftTimestampInvalidZeroAllowed() {
            ((PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(Arrays.asList(3, 42, 6)))).apply((PTransform)ParDo.of(new TestOutputTimestampDoFn()))).apply((PTransform)ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO, Duration.millis((long)-1001L))))).apply((PTransform)ParDo.of(new TestFormatTimestampDoFn()));
            this.thrown.expect(RuntimeException.class);
            this.thrown.expectMessage("Cannot output with timestamp");
            this.thrown.expectMessage("Output timestamps must be no earlier than the timestamp of the current input");
            this.thrown.expectMessage("minus the allowed skew (0 milliseconds).");
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testParDoShiftTimestampUnlimited() {
            PCollection outputs = (PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(Arrays.asList(0L, BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(), GlobalWindow.INSTANCE.maxTimestamp().getMillis())))).apply("AssignTimestampToValue", (PTransform)ParDo.of(new TestOutputTimestampDoFn()))).apply("ReassignToMinimumTimestamp", (PTransform)ParDo.of((DoFn)new DoFn<Long, Long>(){

                @DoFn.ProcessElement
                public void reassignTimestamps(DoFn.ProcessContext context, @DoFn.Element Long element) {
                    context.outputWithTimestamp((Object)element, BoundedWindow.TIMESTAMP_MIN_VALUE);
                }

                public Duration getAllowedTimestampSkew() {
                    return Duration.millis((long)Long.MAX_VALUE);
                }
            }));
            PAssert.that((PCollection)outputs).satisfies((SerializableFunction & Serializable)input -> {
                Assert.assertThat((Object)input, (Matcher)Matchers.hasItem((Object)BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()));
                for (Long elem : input) {
                    Assert.assertThat((Object)elem, (Matcher)Matchers.anyOf((Matcher)Matchers.equalTo((Object)BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()), (Matcher)Matchers.equalTo((Object)GlobalWindow.INSTANCE.maxTimestamp().getMillis()), (Matcher)Matchers.equalTo((Object)0L)));
                }
                return null;
            });
            this.pipeline.run();
        }
    }

    @RunWith(value=JUnit4.class)
    public static class LifecycleTests
    extends SharedTestBase
    implements Serializable {
        @Test
        @Category(value={NeedsRunner.class})
        public void testParDoWithErrorInStartBatch() {
            List<Integer> inputs = Arrays.asList(3, -42, 666);
            ((PCollection)this.pipeline.apply((PTransform)Create.of(inputs))).apply((PTransform)ParDo.of((DoFn)new TestStartBatchErrorDoFn()));
            this.thrown.expect(RuntimeException.class);
            this.thrown.expectMessage("test error in initialize");
            this.pipeline.run();
        }

        @Test
        @Category(value={NeedsRunner.class})
        public void testParDoWithErrorInProcessElement() {
            List<Integer> inputs = Arrays.asList(3, -42, 666);
            ((PCollection)this.pipeline.apply((PTransform)Create.of(inputs))).apply((PTransform)ParDo.of((DoFn)new TestProcessElementErrorDoFn()));
            this.thrown.expect(RuntimeException.class);
            this.thrown.expectMessage("test error in process");
            this.pipeline.run();
        }

        @Test
        @Category(value={NeedsRunner.class})
        public void testParDoWithErrorInFinishBatch() {
            List<Integer> inputs = Arrays.asList(3, -42, 666);
            ((PCollection)this.pipeline.apply((PTransform)Create.of(inputs))).apply((PTransform)ParDo.of((DoFn)new TestFinishBatchErrorDoFn()));
            this.thrown.expect(RuntimeException.class);
            this.thrown.expectMessage("test error in finalize");
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testWindowingInStartAndFinishBundle() {
            final FixedWindows windowFn = FixedWindows.of((Duration)Duration.millis((long)1L));
            PCollection output = (PCollection)((PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.timestamped((TimestampedValue)TimestampedValue.of((Object)"elem", (Instant)new Instant(1L)), (TimestampedValue[])new TimestampedValue[0]))).apply((PTransform)Window.into((WindowFn)windowFn))).apply((PTransform)ParDo.of((DoFn)new DoFn<String, String>(){

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element String element, @DoFn.Timestamp Instant timestamp, DoFn.OutputReceiver<String> r) {
                    r.output((Object)element);
                    System.out.println("Process: " + element + ":" + timestamp.getMillis());
                }

                @DoFn.FinishBundle
                public void finishBundle(DoFn.FinishBundleContext c) {
                    Instant ts = new Instant(3L);
                    c.output((Object)"finish", ts, (BoundedWindow)windowFn.assignWindow(ts));
                    System.out.println("Finish: 3");
                }
            }))).apply((PTransform)ParDo.of((DoFn)new PrintingDoFn()));
            PAssert.that((PCollection)output).satisfies((SerializableFunction)new Checker());
            this.pipeline.run();
        }
    }

    @RunWith(value=JUnit4.class)
    public static class MultipleInputsAndOutputTests
    extends SharedTestBase
    implements Serializable {
        @Test
        @Category(value={ValidatesRunner.class})
        public void testParDoWithTaggedOutput() {
            List<Integer> inputs = Arrays.asList(3, -42, 666);
            TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
            TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1"){};
            TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2"){};
            TupleTag<String> additionalOutputTag3 = new TupleTag<String>("additional3"){};
            TupleTag<String> additionalOutputTagUnwritten = new TupleTag<String>("unwrittenOutput"){};
            PCollectionTuple outputs = (PCollectionTuple)((PCollection)this.pipeline.apply((PTransform)Create.of(inputs))).apply((PTransform)ParDo.of((DoFn)new TestDoFn(Arrays.asList(new PCollectionView[0]), Arrays.asList(additionalOutputTag1, additionalOutputTag2, additionalOutputTag3))).withOutputTags((TupleTag)mainOutputTag, TupleTagList.of((TupleTag)additionalOutputTag3).and((TupleTag)additionalOutputTag1).and((TupleTag)additionalOutputTagUnwritten).and((TupleTag)additionalOutputTag2)));
            PAssert.that((PCollection)outputs.get((TupleTag)mainOutputTag)).satisfies((SerializableFunction)HasExpectedOutput.forInput(inputs));
            PAssert.that((PCollection)outputs.get((TupleTag)additionalOutputTag1)).satisfies((SerializableFunction)HasExpectedOutput.forInput(inputs).fromOutput(additionalOutputTag1));
            PAssert.that((PCollection)outputs.get((TupleTag)additionalOutputTag2)).satisfies((SerializableFunction)HasExpectedOutput.forInput(inputs).fromOutput(additionalOutputTag2));
            PAssert.that((PCollection)outputs.get((TupleTag)additionalOutputTag3)).satisfies((SerializableFunction)HasExpectedOutput.forInput(inputs).fromOutput(additionalOutputTag3));
            PAssert.that((PCollection)outputs.get((TupleTag)additionalOutputTagUnwritten)).empty();
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testParDoEmptyWithTaggedOutput() {
            TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
            TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1"){};
            TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2"){};
            TupleTag<String> additionalOutputTag3 = new TupleTag<String>("additional3"){};
            TupleTag<String> additionalOutputTagUnwritten = new TupleTag<String>("unwrittenOutput"){};
            PCollectionTuple outputs = (PCollectionTuple)((PCollection)this.pipeline.apply((PTransform)Create.empty((Coder)VarIntCoder.of()))).apply((PTransform)ParDo.of((DoFn)new TestDoFn(Arrays.asList(new PCollectionView[0]), Arrays.asList(additionalOutputTag1, additionalOutputTag2, additionalOutputTag3))).withOutputTags((TupleTag)mainOutputTag, TupleTagList.of((TupleTag)additionalOutputTag3).and((TupleTag)additionalOutputTag1).and((TupleTag)additionalOutputTagUnwritten).and((TupleTag)additionalOutputTag2)));
            List<Integer> inputs = Collections.emptyList();
            PAssert.that((PCollection)outputs.get((TupleTag)mainOutputTag)).satisfies((SerializableFunction)HasExpectedOutput.forInput(inputs));
            PAssert.that((PCollection)outputs.get((TupleTag)additionalOutputTag1)).satisfies((SerializableFunction)HasExpectedOutput.forInput(inputs).fromOutput(additionalOutputTag1));
            PAssert.that((PCollection)outputs.get((TupleTag)additionalOutputTag2)).satisfies((SerializableFunction)HasExpectedOutput.forInput(inputs).fromOutput(additionalOutputTag2));
            PAssert.that((PCollection)outputs.get((TupleTag)additionalOutputTag3)).satisfies((SerializableFunction)HasExpectedOutput.forInput(inputs).fromOutput(additionalOutputTag3));
            PAssert.that((PCollection)outputs.get((TupleTag)additionalOutputTagUnwritten)).empty();
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testParDoWithEmptyTaggedOutput() {
            TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
            TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1"){};
            TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2"){};
            PCollectionTuple outputs = (PCollectionTuple)((PCollection)this.pipeline.apply((PTransform)Create.empty((Coder)VarIntCoder.of()))).apply((PTransform)ParDo.of((DoFn)new TestNoOutputDoFn()).withOutputTags((TupleTag)mainOutputTag, TupleTagList.of((TupleTag)additionalOutputTag1).and((TupleTag)additionalOutputTag2)));
            PAssert.that((PCollection)outputs.get((TupleTag)mainOutputTag)).empty();
            PAssert.that((PCollection)outputs.get((TupleTag)additionalOutputTag1)).empty();
            PAssert.that((PCollection)outputs.get((TupleTag)additionalOutputTag2)).empty();
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testParDoWithOnlyTaggedOutput() {
            List<Integer> inputs = Arrays.asList(3, -42, 666);
            TupleTag<Void> mainOutputTag = new TupleTag<Void>("main"){};
            TupleTag<Integer> additionalOutputTag = new TupleTag<Integer>("additional"){};
            PCollectionTuple outputs = (PCollectionTuple)((PCollection)this.pipeline.apply((PTransform)Create.of(inputs))).apply((PTransform)ParDo.of((DoFn)new DoFn<Integer, Void>((TupleTag)additionalOutputTag){
                final /* synthetic */ TupleTag val$additionalOutputTag;
                {
                    this.val$additionalOutputTag = tupleTag;
                }

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element Integer element, DoFn.MultiOutputReceiver r) {
                    r.get(this.val$additionalOutputTag).output((Object)element);
                }
            }).withOutputTags((TupleTag)mainOutputTag, TupleTagList.of((TupleTag)additionalOutputTag)));
            PAssert.that((PCollection)outputs.get((TupleTag)mainOutputTag)).empty();
            PAssert.that((PCollection)outputs.get((TupleTag)additionalOutputTag)).containsInAnyOrder(inputs);
            this.pipeline.run();
        }

        @Test
        @Category(value={NeedsRunner.class})
        public void testParDoWritingToUndeclaredTag() {
            List<Integer> inputs = Arrays.asList(3, -42, 666);
            TupleTag<String> notOutputTag = new TupleTag<String>("additional"){};
            ((PCollection)this.pipeline.apply((PTransform)Create.of(inputs))).apply((PTransform)ParDo.of((DoFn)new TestDoFn(Arrays.asList(new PCollectionView[0]), Arrays.asList(notOutputTag))));
            this.thrown.expectMessage("additional");
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testParDoWithSideInputs() {
            List<Integer> inputs = Arrays.asList(3, -42, 666);
            PCollectionView sideInput1 = (PCollectionView)((PCollection)this.pipeline.apply("CreateSideInput1", (PTransform)Create.of((Object)11, (Object[])new Integer[0]))).apply("ViewSideInput1", (PTransform)View.asSingleton());
            PCollectionView sideInputUnread = (PCollectionView)((PCollection)this.pipeline.apply("CreateSideInputUnread", (PTransform)Create.of((Object)-3333, (Object[])new Integer[0]))).apply("ViewSideInputUnread", (PTransform)View.asSingleton());
            PCollectionView sideInput2 = (PCollectionView)((PCollection)this.pipeline.apply("CreateSideInput2", (PTransform)Create.of((Object)222, (Object[])new Integer[0]))).apply("ViewSideInput2", (PTransform)View.asSingleton());
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(inputs))).apply((PTransform)ParDo.of((DoFn)new TestDoFn(Arrays.asList(sideInput1, sideInput2), Arrays.asList(new TupleTag[0]))).withSideInputs(new PCollectionView[]{sideInput1, sideInputUnread, sideInput2}));
            PAssert.that((PCollection)output).satisfies((SerializableFunction)HasExpectedOutput.forInput(inputs).andSideInputs(11, 222));
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testParDoWithSideInputsIsCumulative() {
            List<Integer> inputs = Arrays.asList(3, -42, 666);
            PCollectionView sideInput1 = (PCollectionView)((PCollection)this.pipeline.apply("CreateSideInput1", (PTransform)Create.of((Object)11, (Object[])new Integer[0]))).apply("ViewSideInput1", (PTransform)View.asSingleton());
            PCollectionView sideInputUnread = (PCollectionView)((PCollection)this.pipeline.apply("CreateSideInputUnread", (PTransform)Create.of((Object)-3333, (Object[])new Integer[0]))).apply("ViewSideInputUnread", (PTransform)View.asSingleton());
            PCollectionView sideInput2 = (PCollectionView)((PCollection)this.pipeline.apply("CreateSideInput2", (PTransform)Create.of((Object)222, (Object[])new Integer[0]))).apply("ViewSideInput2", (PTransform)View.asSingleton());
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(inputs))).apply((PTransform)ParDo.of((DoFn)new TestDoFn(Arrays.asList(sideInput1, sideInput2), Arrays.asList(new TupleTag[0]))).withSideInputs(new PCollectionView[]{sideInput1}).withSideInputs(new PCollectionView[]{sideInputUnread}).withSideInputs(new PCollectionView[]{sideInput2}));
            PAssert.that((PCollection)output).satisfies((SerializableFunction)HasExpectedOutput.forInput(inputs).andSideInputs(11, 222));
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testMultiOutputParDoWithSideInputs() {
            List<Integer> inputs = Arrays.asList(3, -42, 666);
            TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
            TupleTag<Void> additionalOutputTag = new TupleTag<Void>("output"){};
            PCollectionView sideInput1 = (PCollectionView)((PCollection)this.pipeline.apply("CreateSideInput1", (PTransform)Create.of((Object)11, (Object[])new Integer[0]))).apply("ViewSideInput1", (PTransform)View.asSingleton());
            PCollectionView sideInputUnread = (PCollectionView)((PCollection)this.pipeline.apply("CreateSideInputUnread", (PTransform)Create.of((Object)-3333, (Object[])new Integer[0]))).apply("ViewSideInputUnread", (PTransform)View.asSingleton());
            PCollectionView sideInput2 = (PCollectionView)((PCollection)this.pipeline.apply("CreateSideInput2", (PTransform)Create.of((Object)222, (Object[])new Integer[0]))).apply("ViewSideInput2", (PTransform)View.asSingleton());
            PCollectionTuple outputs = (PCollectionTuple)((PCollection)this.pipeline.apply((PTransform)Create.of(inputs))).apply((PTransform)ParDo.of((DoFn)new TestDoFn(Arrays.asList(sideInput1, sideInput2), Arrays.asList(new TupleTag[0]))).withSideInputs(new PCollectionView[]{sideInput1}).withSideInputs(new PCollectionView[]{sideInputUnread}).withSideInputs(new PCollectionView[]{sideInput2}).withOutputTags((TupleTag)mainOutputTag, TupleTagList.of((TupleTag)additionalOutputTag)));
            PAssert.that((PCollection)outputs.get((TupleTag)mainOutputTag)).satisfies((SerializableFunction)HasExpectedOutput.forInput(inputs).andSideInputs(11, 222));
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testMultiOutputParDoWithSideInputsIsCumulative() {
            List<Integer> inputs = Arrays.asList(3, -42, 666);
            TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
            TupleTag<Void> additionalOutputTag = new TupleTag<Void>("output"){};
            PCollectionView sideInput1 = (PCollectionView)((PCollection)this.pipeline.apply("CreateSideInput1", (PTransform)Create.of((Object)11, (Object[])new Integer[0]))).apply("ViewSideInput1", (PTransform)View.asSingleton());
            PCollectionView sideInputUnread = (PCollectionView)((PCollection)this.pipeline.apply("CreateSideInputUnread", (PTransform)Create.of((Object)-3333, (Object[])new Integer[0]))).apply("ViewSideInputUnread", (PTransform)View.asSingleton());
            PCollectionView sideInput2 = (PCollectionView)((PCollection)this.pipeline.apply("CreateSideInput2", (PTransform)Create.of((Object)222, (Object[])new Integer[0]))).apply("ViewSideInput2", (PTransform)View.asSingleton());
            PCollectionTuple outputs = (PCollectionTuple)((PCollection)this.pipeline.apply((PTransform)Create.of(inputs))).apply((PTransform)ParDo.of((DoFn)new TestDoFn(Arrays.asList(sideInput1, sideInput2), Arrays.asList(new TupleTag[0]))).withSideInputs(new PCollectionView[]{sideInput1}).withSideInputs(new PCollectionView[]{sideInputUnread}).withSideInputs(new PCollectionView[]{sideInput2}).withOutputTags((TupleTag)mainOutputTag, TupleTagList.of((TupleTag)additionalOutputTag)));
            PAssert.that((PCollection)outputs.get((TupleTag)mainOutputTag)).satisfies((SerializableFunction)HasExpectedOutput.forInput(inputs).andSideInputs(11, 222));
            this.pipeline.run();
        }

        @Test
        @Category(value={NeedsRunner.class})
        public void testParDoReadingFromUnknownSideInput() {
            List<Integer> inputs = Arrays.asList(3, -42, 666);
            PCollectionView sideView = (PCollectionView)((PCollection)this.pipeline.apply("Create3", (PTransform)Create.of((Object)3, (Object[])new Integer[0]))).apply((PTransform)View.asSingleton());
            ((PCollection)this.pipeline.apply("CreateMain", (PTransform)Create.of(inputs))).apply((PTransform)ParDo.of((DoFn)new TestDoFn(Arrays.asList(sideView), Arrays.asList(new TupleTag[0]))));
            this.thrown.expect(RuntimeException.class);
            this.thrown.expectMessage("calling sideInput() with unknown view");
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testSideInputsWithMultipleWindows() {
            MutableDateTime mutableNow = Instant.now().toMutableDateTime();
            mutableNow.setMillisOfSecond(0);
            Instant now = mutableNow.toInstant();
            SlidingWindows windowFn = SlidingWindows.of((Duration)Duration.standardSeconds((long)5L)).every(Duration.standardSeconds((long)1L));
            PCollectionView view = (PCollectionView)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)1, (Object[])new Integer[0]))).apply((PTransform)View.asSingleton());
            PCollection res = (PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.timestamped((TimestampedValue)TimestampedValue.of((Object)"a", (Instant)now), (TimestampedValue[])new TimestampedValue[0]))).apply((PTransform)Window.into((WindowFn)windowFn))).apply((PTransform)ParDo.of((DoFn)new FnWithSideInputs(view)).withSideInputs(new PCollectionView[]{view}));
            for (int i = 0; i < 4; ++i) {
                Instant base = now.minus((ReadableDuration)Duration.standardSeconds((long)i));
                IntervalWindow window = new IntervalWindow(base, base.plus((ReadableDuration)Duration.standardSeconds((long)5L)));
                PAssert.that((PCollection)res).inWindow((BoundedWindow)window).containsInAnyOrder((Object[])new String[]{"a:1"});
            }
            this.pipeline.run();
        }

        @Test
        public void testParDoOutputNameBasedOnDoFnWithTrimmedSuffix() {
            this.pipeline.enableAbandonedNodeEnforcement(false);
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)1, (Object[])new Integer[0]))).apply((PTransform)ParDo.of((DoFn)new TestDoFn()));
            Assert.assertThat((Object)output.getName(), (Matcher)Matchers.containsString((String)"ParDo(Test)"));
        }

        @Test
        public void testParDoOutputNameBasedOnLabel() {
            this.pipeline.enableAbandonedNodeEnforcement(false);
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)1, (Object[])new Integer[0]))).apply("MyParDo", (PTransform)ParDo.of((DoFn)new TestDoFn()));
            Assert.assertThat((Object)output.getName(), (Matcher)Matchers.containsString((String)"MyParDo"));
        }

        @Test
        public void testParDoOutputNameBasedDoFnWithoutMatchingSuffix() {
            this.pipeline.enableAbandonedNodeEnforcement(false);
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)1, (Object[])new Integer[0]))).apply((PTransform)ParDo.of((DoFn)new StrangelyNamedDoer()));
            Assert.assertThat((Object)output.getName(), (Matcher)Matchers.containsString((String)"ParDo(StrangelyNamedDoer)"));
        }

        @Test
        public void testParDoMultiNameBasedDoFnWithTrimmerSuffix() {
            Assert.assertThat((Object)ParDo.of((DoFn)new TaggedOutputDummyFn(null, null)).withOutputTags(null, null).getName(), (Matcher)Matchers.containsString((String)"ParMultiDo(TaggedOutputDummy)"));
        }

        @Test
        public void testParDoWithTaggedOutputName() {
            this.pipeline.enableAbandonedNodeEnforcement(false);
            TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
            TupleTag<String> additionalOutputTag1 = new TupleTag<String>("output1"){};
            TupleTag<String> additionalOutputTag2 = new TupleTag<String>("output2"){};
            TupleTag<String> additionalOutputTag3 = new TupleTag<String>("output3"){};
            TupleTag<String> additionalOutputTagUnwritten = new TupleTag<String>("unwrittenOutput"){};
            PCollectionTuple outputs = (PCollectionTuple)((PCollection)this.pipeline.apply((PTransform)Create.of(Arrays.asList(3, -42, 666)))).setName("MyInput").apply("MyParDo", (PTransform)ParDo.of((DoFn)new TestDoFn(Arrays.asList(new PCollectionView[0]), Arrays.asList(additionalOutputTag1, additionalOutputTag2, additionalOutputTag3))).withOutputTags((TupleTag)mainOutputTag, TupleTagList.of((TupleTag)additionalOutputTag3).and((TupleTag)additionalOutputTag1).and((TupleTag)additionalOutputTagUnwritten).and((TupleTag)additionalOutputTag2)));
            Assert.assertEquals((Object)"MyParDo.main", (Object)outputs.get((TupleTag)mainOutputTag).getName());
            Assert.assertEquals((Object)"MyParDo.output1", (Object)outputs.get((TupleTag)additionalOutputTag1).getName());
            Assert.assertEquals((Object)"MyParDo.output2", (Object)outputs.get((TupleTag)additionalOutputTag2).getName());
            Assert.assertEquals((Object)"MyParDo.output3", (Object)outputs.get((TupleTag)additionalOutputTag3).getName());
            Assert.assertEquals((Object)"MyParDo.unwrittenOutput", (Object)outputs.get((TupleTag)additionalOutputTagUnwritten).getName());
        }

        @Test
        public void testMultiOutputAppliedMultipleTimesDifferentOutputs() {
            this.pipeline.enableAbandonedNodeEnforcement(false);
            PCollection longs = (PCollection)this.pipeline.apply((PTransform)GenerateSequence.from((long)0L));
            TupleTag mainOut = new TupleTag();
            final TupleTag valueAsString = new TupleTag();
            final TupleTag valueAsInt = new TupleTag();
            DoFn<Long, Long> fn = new DoFn<Long, Long>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext cxt, @DoFn.Element Long element) {
                    cxt.output((Object)((Long)cxt.element()));
                    cxt.output(valueAsString, (Object)Long.toString((Long)cxt.element()));
                    cxt.output(valueAsInt, (Object)element.intValue());
                }
            };
            ParDo.MultiOutput parDo = ParDo.of((DoFn)fn).withOutputTags(mainOut, TupleTagList.of((TupleTag)valueAsString).and(valueAsInt));
            PCollectionTuple firstApplication = (PCollectionTuple)longs.apply("first", (PTransform)parDo);
            PCollectionTuple secondApplication = (PCollectionTuple)longs.apply("second", (PTransform)parDo);
            Assert.assertThat((Object)firstApplication, (Matcher)Matchers.not((Matcher)Matchers.equalTo((Object)secondApplication)));
            Assert.assertThat(firstApplication.getAll().keySet(), (Matcher)Matchers.containsInAnyOrder((Object[])new TupleTag[]{mainOut, valueAsString, valueAsInt}));
            Assert.assertThat(secondApplication.getAll().keySet(), (Matcher)Matchers.containsInAnyOrder((Object[])new TupleTag[]{mainOut, valueAsString, valueAsInt}));
        }

        @Test
        @Category(value={NeedsRunner.class})
        public void testMultiOutputChaining() {
            PCollectionTuple filters = (PCollectionTuple)((PCollection)this.pipeline.apply((PTransform)Create.of(Arrays.asList(3, 4, 5, 6)))).apply((PTransform)new MultiFilter());
            PCollection by2 = filters.get(MultiFilter.BY2);
            PCollection by3 = filters.get(MultiFilter.BY3);
            PCollection by2then3 = (PCollection)by2.apply("Filter3sAgain", (PTransform)ParDo.of((DoFn)new MultiFilter.FilterFn(3)));
            PCollection by3then2 = (PCollection)by3.apply("Filter2sAgain", (PTransform)ParDo.of((DoFn)new MultiFilter.FilterFn(2)));
            PAssert.that((PCollection)by2then3).containsInAnyOrder((Object[])new Integer[]{6});
            PAssert.that((PCollection)by3then2).containsInAnyOrder((Object[])new Integer[]{6});
            this.pipeline.run();
        }

        @Test
        @Category(value={NeedsRunner.class})
        public void testTaggedOutputUnknownCoder() throws Exception {
            PCollection input = (PCollection)this.pipeline.apply((PTransform)Create.of(Arrays.asList(1, 2, 3)));
            TupleTag mainOutputTag = new TupleTag("main");
            TupleTag additionalOutputTag = new TupleTag("unknownSide");
            input.apply((PTransform)ParDo.of((DoFn)new TaggedOutputDummyFn((TupleTag<Integer>)mainOutputTag, (TupleTag<TestDummy>)additionalOutputTag)).withOutputTags(mainOutputTag, TupleTagList.of((TupleTag)additionalOutputTag)));
            this.thrown.expect(IllegalStateException.class);
            this.thrown.expectMessage("Unable to return a default Coder");
            this.pipeline.run();
        }

        @Test
        public void testTaggedOutputUnregisteredExplicitCoder() throws Exception {
            this.pipeline.enableAbandonedNodeEnforcement(false);
            PCollection input = (PCollection)this.pipeline.apply((PTransform)Create.of(Arrays.asList(1, 2, 3)));
            TupleTag mainOutputTag = new TupleTag("main");
            TupleTag additionalOutputTag = new TupleTag("unregisteredSide");
            ParDo.MultiOutput pardo = ParDo.of((DoFn)new TaggedOutputDummyFn((TupleTag<Integer>)mainOutputTag, (TupleTag<TestDummy>)additionalOutputTag)).withOutputTags(mainOutputTag, TupleTagList.of((TupleTag)additionalOutputTag));
            PCollectionTuple outputTuple = (PCollectionTuple)input.apply((PTransform)pardo);
            outputTuple.get(additionalOutputTag).setCoder((Coder)new TestDummyCoder());
            outputTuple.get(additionalOutputTag).apply((PTransform)View.asSingleton());
            Assert.assertEquals((Object)((Object)new TestDummyCoder()), (Object)outputTuple.get(additionalOutputTag).getCoder());
            outputTuple.get(additionalOutputTag).finishSpecifyingOutput("ParDo", (PInput)input, (PTransform)pardo);
            Assert.assertEquals((Object)((Object)new TestDummyCoder()), (Object)outputTuple.get(additionalOutputTag).getCoder());
        }

        @Test
        @Category(value={NeedsRunner.class})
        public void testMainOutputUnregisteredExplicitCoder() {
            PCollection input = (PCollection)this.pipeline.apply((PTransform)Create.of(Arrays.asList(1, 2, 3)));
            TupleTag mainOutputTag = new TupleTag("unregisteredMain");
            TupleTag<Integer> additionalOutputTag = new TupleTag<Integer>("additionalOutput"){};
            PCollectionTuple outputTuple = (PCollectionTuple)input.apply((PTransform)ParDo.of((DoFn)new MainOutputDummyFn((TupleTag<TestDummy>)mainOutputTag, additionalOutputTag)).withOutputTags(mainOutputTag, TupleTagList.of((TupleTag)additionalOutputTag)));
            outputTuple.get(mainOutputTag).setCoder((Coder)new TestDummyCoder());
            this.pipeline.run();
        }

        @Test
        @Category(value={NeedsRunner.class})
        public void testMainOutputApplyTaggedOutputNoCoder() {
            TupleTag mainOutputTag = new TupleTag("main");
            final TupleTag additionalOutputTag = new TupleTag("additionalOutput");
            PCollectionTuple tuple = (PCollectionTuple)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)new TestDummy(), (Object[])new TestDummy[0]).withCoder((Coder)TestDummyCoder.of()))).apply((PTransform)ParDo.of((DoFn)new DoFn<TestDummy, TestDummy>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext context, @DoFn.Element TestDummy element) {
                    context.output((Object)element);
                    context.output(additionalOutputTag, (Object)element);
                }
            }).withOutputTags(mainOutputTag, TupleTagList.of((TupleTag)additionalOutputTag)));
            tuple.get(mainOutputTag).setCoder((Coder)TestDummyCoder.of()).apply("Output1", (PTransform)ParDo.of((DoFn)new DoFn<TestDummy, Integer>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext context) {
                    context.output((Object)1);
                }
            }));
            tuple.get(additionalOutputTag).setCoder((Coder)TestDummyCoder.of());
            this.pipeline.run();
        }

        @Test
        public void testWithOutputTagsDisplayData() {
            DoFn<String, String> fn = new DoFn<String, String>(){

                @DoFn.ProcessElement
                public void proccessElement(DoFn.ProcessContext c) {
                }

                public void populateDisplayData(DisplayData.Builder builder) {
                    builder.add(DisplayData.item((String)"fnMetadata", (String)"foobar"));
                }
            };
            ParDo.MultiOutput parDo = ParDo.of((DoFn)fn).withOutputTags(new TupleTag(), TupleTagList.empty());
            DisplayData displayData = DisplayData.from((HasDisplayData)parDo);
            Assert.assertThat((Object)displayData, DisplayDataMatchers.includesDisplayDataFor("fn", (HasDisplayData)fn));
            Assert.assertThat((Object)displayData, DisplayDataMatchers.hasDisplayItem("fn", fn.getClass()));
        }
    }

    @RunWith(value=JUnit4.class)
    public static class BasicTests
    extends SharedTestBase
    implements Serializable {
        @Test
        @Category(value={ValidatesRunner.class})
        public void testParDo() {
            List<Integer> inputs = Arrays.asList(3, -42, 666);
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(inputs))).apply((PTransform)ParDo.of((DoFn)new TestDoFn()));
            PAssert.that((PCollection)output).satisfies((SerializableFunction)HasExpectedOutput.forInput(inputs));
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testParDoEmpty() {
            List<Integer> inputs = Arrays.asList(new Integer[0]);
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(inputs).withCoder((Coder)VarIntCoder.of()))).apply("TestDoFn", (PTransform)ParDo.of((DoFn)new TestDoFn()));
            PAssert.that((PCollection)output).satisfies((SerializableFunction)HasExpectedOutput.forInput(inputs));
            this.pipeline.run();
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testParDoEmptyOutputs() {
            List<Integer> inputs = Arrays.asList(new Integer[0]);
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(inputs).withCoder((Coder)VarIntCoder.of()))).apply("TestDoFn", (PTransform)ParDo.of((DoFn)new TestNoOutputDoFn()));
            PAssert.that((PCollection)output).empty();
            this.pipeline.run();
        }

        @Test
        public void testParDoTransformNameBasedDoFnWithTrimmedSuffix() {
            Assert.assertThat((Object)ParDo.of((DoFn)new PrintingDoFn()).getName(), (Matcher)Matchers.containsString((String)"ParDo(Printing)"));
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testParDoInCustomTransform() {
            List<Integer> inputs = Arrays.asList(3, -42, 666);
            PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(inputs))).apply("CustomTransform", (PTransform)new PTransform<PCollection<Integer>, PCollection<String>>(){

                public PCollection<String> expand(PCollection<Integer> input) {
                    return (PCollection)input.apply((PTransform)ParDo.of((DoFn)new TestDoFn()));
                }
            });
            PAssert.that((PCollection)output).satisfies((SerializableFunction)HasExpectedOutput.forInput(inputs));
            this.pipeline.run();
        }

        @Test
        public void testJsonEscaping() {
            DoFn<Integer, Integer> doFn = new DoFn<Integer, Integer>(){

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element Integer element, DoFn.OutputReceiver<Integer> r) {
                    r.output((Object)(element + 1));
                }
            };
            byte[] serializedBytes = SerializableUtils.serializeToByteArray((Serializable)doFn);
            String serializedJson = StringUtils.byteArrayToJsonString((byte[])serializedBytes);
            Assert.assertArrayEquals((byte[])serializedBytes, (byte[])StringUtils.jsonStringToByteArray((String)serializedJson));
        }

        @Test
        public void testDoFnDisplayData() {
            DoFn<String, String> fn = new DoFn<String, String>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c) {
                }

                public void populateDisplayData(DisplayData.Builder builder) {
                    builder.add(DisplayData.item((String)"doFnMetadata", (String)"bar"));
                }
            };
            ParDo.SingleOutput parDo = ParDo.of((DoFn)fn);
            DisplayData displayData = DisplayData.from((HasDisplayData)parDo);
            Assert.assertThat((Object)displayData, DisplayDataMatchers.hasDisplayItem((Matcher<DisplayData.Item>)Matchers.allOf(DisplayDataMatchers.hasKey("fn"), DisplayDataMatchers.hasType(DisplayData.Type.JAVA_CLASS), DisplayDataMatchers.hasValue(fn.getClass().getName()))));
            Assert.assertThat((Object)displayData, DisplayDataMatchers.includesDisplayDataFor("fn", (HasDisplayData)fn));
        }

        @Test
        public void testDoFnWithContextDisplayData() {
            DoFn<String, String> fn = new DoFn<String, String>(){

                @DoFn.ProcessElement
                public void proccessElement(DoFn.ProcessContext c) {
                }

                public void populateDisplayData(DisplayData.Builder builder) {
                    builder.add(DisplayData.item((String)"fnMetadata", (String)"foobar"));
                }
            };
            ParDo.SingleOutput parDo = ParDo.of((DoFn)fn);
            DisplayData displayData = DisplayData.from((HasDisplayData)parDo);
            Assert.assertThat((Object)displayData, DisplayDataMatchers.includesDisplayDataFor("fn", (HasDisplayData)fn));
            Assert.assertThat((Object)displayData, DisplayDataMatchers.hasDisplayItem("fn", fn.getClass()));
        }

        @Test
        public void testRejectsWrongWindowType() {
            this.thrown.expect(IllegalArgumentException.class);
            this.thrown.expectMessage(GlobalWindow.class.getSimpleName());
            this.thrown.expectMessage(IntervalWindow.class.getSimpleName());
            this.thrown.expectMessage("window type");
            this.thrown.expectMessage("not a supertype");
            ((PCollection)this.pipeline.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}))).apply((PTransform)ParDo.of((DoFn)new DoFn<Integer, Integer>(){

                @DoFn.ProcessElement
                public void process(DoFn.ProcessContext c, IntervalWindow w) {
                }
            }));
        }

        @Ignore(value="ParDo rejects this on account of it using timers")
        @Test
        public void testMultipleWindowSubtypesOK() {
            String timerId = "gobbledegook";
            ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}))).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardSeconds((long)10L))))).apply((PTransform)ParDo.of((DoFn)new DoFn<Integer, Integer>(){
                @DoFn.TimerId(value="gobbledegook")
                private final TimerSpec spec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

                @DoFn.ProcessElement
                public void process(DoFn.ProcessContext c, IntervalWindow w) {
                }

                @DoFn.OnTimer(value="gobbledegook")
                public void onTimer(BoundedWindow w) {
                }
            }));
        }

        @Test
        @Category(value={ValidatesRunner.class})
        public void testPipelineOptionsParameter() {
            PCollection results = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)1, (Object[])new Integer[0]))).apply((PTransform)ParDo.of((DoFn)new DoFn<Integer, String>(){

                @DoFn.ProcessElement
                public void process(DoFn.OutputReceiver<String> r, PipelineOptions options) {
                    r.output((Object)((MyOptions)options.as(MyOptions.class)).getFakeOption());
                }
            }));
            String testOptionValue = "not fake anymore";
            ((MyOptions)this.pipeline.getOptions().as(MyOptions.class)).setFakeOption(testOptionValue);
            PAssert.that((PCollection)results).containsInAnyOrder((Object[])new String[]{"not fake anymore"});
            this.pipeline.run();
        }
    }

    static class MultiFilter
    extends PTransform<PCollection<Integer>, PCollectionTuple> {
        private static final TupleTag<Integer> BY2 = new TupleTag<Integer>("by2"){};
        private static final TupleTag<Integer> BY3 = new TupleTag<Integer>("by3"){};

        MultiFilter() {
        }

        public PCollectionTuple expand(PCollection<Integer> input) {
            PCollection by2 = (PCollection)input.apply("Filter2s", (PTransform)ParDo.of((DoFn)new FilterFn(2)));
            PCollection by3 = (PCollection)input.apply("Filter3s", (PTransform)ParDo.of((DoFn)new FilterFn(3)));
            return PCollectionTuple.of(BY2, (PCollection)by2).and(BY3, by3);
        }

        static class FilterFn
        extends DoFn<Integer, Integer> {
            private final int divisor;

            FilterFn(int divisor) {
                this.divisor = divisor;
            }

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element Integer element, DoFn.OutputReceiver<Integer> r) throws Exception {
                if (element % this.divisor == 0) {
                    r.output((Object)element);
                }
            }
        }
    }

    static class TestFormatTimestampDoFn<T extends Number>
    extends DoFn<T, String> {
        TestFormatTimestampDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element T element, @DoFn.Timestamp Instant timestamp, DoFn.OutputReceiver<String> r) {
            Preconditions.checkNotNull(timestamp);
            r.output((Object)("processing: " + element + ", timestamp: " + timestamp.getMillis()));
        }
    }

    static class TestShiftTimestampDoFn<T extends Number>
    extends DoFn<T, T> {
        private Duration allowedTimestampSkew;
        private Duration durationToShift;

        public TestShiftTimestampDoFn(Duration allowedTimestampSkew, Duration durationToShift) {
            this.allowedTimestampSkew = allowedTimestampSkew;
            this.durationToShift = durationToShift;
        }

        public Duration getAllowedTimestampSkew() {
            return this.allowedTimestampSkew;
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element T value, @DoFn.Timestamp Instant timestamp, DoFn.OutputReceiver<T> r) {
            Preconditions.checkNotNull(timestamp);
            r.outputWithTimestamp(value, timestamp.plus((ReadableDuration)this.durationToShift));
        }
    }

    static class TestOutputTimestampDoFn<T extends Number>
    extends DoFn<T, T> {
        TestOutputTimestampDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element T value, DoFn.OutputReceiver<T> r) {
            r.outputWithTimestamp(value, new Instant(((Number)value).longValue()));
        }
    }

    private static class StrangelyNamedDoer
    extends DoFn<Integer, String> {
        private StrangelyNamedDoer() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
        }
    }

    static class TestFinishBatchErrorDoFn
    extends DoFn<Integer, String> {
        TestFinishBatchErrorDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn.FinishBundleContext c) {
            throw new RuntimeException("test error in finalize");
        }
    }

    static class TestProcessElementErrorDoFn
    extends DoFn<Integer, String> {
        TestProcessElementErrorDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            throw new RuntimeException("test error in process");
        }
    }

    static class TestStartBatchErrorDoFn
    extends DoFn<Integer, String> {
        TestStartBatchErrorDoFn() {
        }

        @DoFn.StartBundle
        public void startBundle() {
            throw new RuntimeException("test error in initialize");
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
        }
    }

    static class TestDoFn
    extends DoFn<Integer, String> {
        State state = State.NOT_SET_UP;
        final List<PCollectionView<Integer>> sideInputViews = new ArrayList<PCollectionView<Integer>>();
        final List<TupleTag<String>> additionalOutputTupleTags = new ArrayList<TupleTag<String>>();

        public TestDoFn() {
        }

        public TestDoFn(List<PCollectionView<Integer>> sideInputViews, List<TupleTag<String>> additionalOutputTupleTags) {
            this.sideInputViews.addAll(sideInputViews);
            this.additionalOutputTupleTags.addAll(additionalOutputTupleTags);
        }

        @DoFn.Setup
        public void prepare() {
            Assert.assertEquals((Object)((Object)State.NOT_SET_UP), (Object)((Object)this.state));
            this.state = State.UNSTARTED;
        }

        @DoFn.StartBundle
        public void startBundle() {
            Assert.assertThat((Object)((Object)this.state), (Matcher)Matchers.anyOf((Matcher)Matchers.equalTo((Object)((Object)State.UNSTARTED)), (Matcher)Matchers.equalTo((Object)((Object)State.FINISHED))));
            this.state = State.STARTED;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c, @DoFn.Element Integer element) {
            Assert.assertThat((Object)((Object)this.state), (Matcher)Matchers.anyOf((Matcher)Matchers.equalTo((Object)((Object)State.STARTED)), (Matcher)Matchers.equalTo((Object)((Object)State.PROCESSING))));
            this.state = State.PROCESSING;
            this.outputToAllWithSideInputs(c, "processing: " + element);
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn.FinishBundleContext c) {
            Assert.assertThat((Object)((Object)this.state), (Matcher)Matchers.anyOf((Matcher)Matchers.equalTo((Object)((Object)State.STARTED)), (Matcher)Matchers.equalTo((Object)((Object)State.PROCESSING))));
            this.state = State.FINISHED;
            c.output((Object)"finished", BoundedWindow.TIMESTAMP_MIN_VALUE, (BoundedWindow)GlobalWindow.INSTANCE);
            for (TupleTag<String> additionalOutputTupleTag : this.additionalOutputTupleTags) {
                c.output(additionalOutputTupleTag, (Object)(additionalOutputTupleTag.getId() + ": finished"), BoundedWindow.TIMESTAMP_MIN_VALUE, (BoundedWindow)GlobalWindow.INSTANCE);
            }
        }

        private void outputToAllWithSideInputs(DoFn.ProcessContext c, String value) {
            if (!this.sideInputViews.isEmpty()) {
                ArrayList<Integer> sideInputValues = new ArrayList<Integer>();
                for (PCollectionView<Integer> sideInputView : this.sideInputViews) {
                    sideInputValues.add((Integer)c.sideInput(sideInputView));
                }
                value = value + ": " + sideInputValues;
            }
            c.output((Object)value);
            for (TupleTag<String> additionalOutputTupleTag : this.additionalOutputTupleTags) {
                c.output(additionalOutputTupleTag, (Object)(additionalOutputTupleTag.getId() + ": " + value));
            }
        }

        static enum State {
            NOT_SET_UP,
            UNSTARTED,
            STARTED,
            PROCESSING,
            FINISHED;

        }
    }

    static class TestNoOutputDoFn
    extends DoFn<Integer, String> {
        TestNoOutputDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws Exception {
        }
    }

    private static class PrintingDoFn
    extends DoFn<String, String> {
        private PrintingDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element String element, @DoFn.Timestamp Instant timestamp, BoundedWindow window, DoFn.OutputReceiver<String> receiver) {
            receiver.output((Object)(element + ":" + timestamp.getMillis() + ":" + window.maxTimestamp().getMillis()));
        }
    }

    public static abstract class SharedTestBase {
        @Rule
        public final transient TestPipeline pipeline = TestPipeline.create();
        @Rule
        public transient ExpectedException thrown = ExpectedException.none();
    }
}

