/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.transforms.reflect;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.GroupingState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class DoFnSignaturesTest {
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Test
    public void testBasicDoFnProcessContext() throws Exception {
        DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext c) {
            }
        }.getClass());
        Assert.assertThat((Object)sig.processElement().extraParameters().size(), (Matcher)Matchers.equalTo((Object)1));
        Assert.assertThat((Object)((DoFnSignature.Parameter)sig.processElement().extraParameters().get(0)), (Matcher)Matchers.instanceOf(DoFnSignature.Parameter.ProcessContextParameter.class));
    }

    @Test
    public void testBasicDoFnAllParameters() throws Exception {
        DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>(){

            @DoFn.ProcessElement
            public void process(@DoFn.Element String element, @DoFn.Timestamp Instant timestamp, BoundedWindow window, PaneInfo paneInfo, DoFn.OutputReceiver<String> receiver, PipelineOptions options) {
            }
        }.getClass());
        Assert.assertThat((Object)sig.processElement().extraParameters().size(), (Matcher)Matchers.equalTo((Object)6));
        Assert.assertThat((Object)((DoFnSignature.Parameter)sig.processElement().extraParameters().get(0)), (Matcher)Matchers.instanceOf(DoFnSignature.Parameter.ElementParameter.class));
        Assert.assertThat((Object)((DoFnSignature.Parameter)sig.processElement().extraParameters().get(1)), (Matcher)Matchers.instanceOf(DoFnSignature.Parameter.TimestampParameter.class));
        Assert.assertThat((Object)((DoFnSignature.Parameter)sig.processElement().extraParameters().get(2)), (Matcher)Matchers.instanceOf(DoFnSignature.Parameter.WindowParameter.class));
        Assert.assertThat((Object)((DoFnSignature.Parameter)sig.processElement().extraParameters().get(3)), (Matcher)Matchers.instanceOf(DoFnSignature.Parameter.PaneInfoParameter.class));
        Assert.assertThat((Object)((DoFnSignature.Parameter)sig.processElement().extraParameters().get(4)), (Matcher)Matchers.instanceOf(DoFnSignature.Parameter.OutputReceiverParameter.class));
        Assert.assertThat((Object)((DoFnSignature.Parameter)sig.processElement().extraParameters().get(5)), (Matcher)Matchers.instanceOf(DoFnSignature.Parameter.PipelineOptionsParameter.class));
    }

    @Test
    public void testBasicDoFnMultiOutputReceiver() throws Exception {
        DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>(){

            @DoFn.ProcessElement
            public void process(DoFn.MultiOutputReceiver receiver) {
            }
        }.getClass());
        Assert.assertThat((Object)sig.processElement().extraParameters().size(), (Matcher)Matchers.equalTo((Object)1));
        Assert.assertThat((Object)((DoFnSignature.Parameter)sig.processElement().extraParameters().get(0)), (Matcher)Matchers.instanceOf(DoFnSignature.Parameter.TaggedOutputReceiverParameter.class));
    }

    @Test
    public void testWrongElementType() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("@Element argument must have type java.lang.String");
        DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>(){

            @DoFn.ProcessElement
            public void process(@DoFn.Element Integer element) {
            }
        }.getClass());
    }

    @Test
    public void testWrongTimestampType() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("@Timestamp argument must have type org.joda.time.Instant");
        DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>(){

            @DoFn.ProcessElement
            public void process(@DoFn.Timestamp String timestamp) {
            }
        }.getClass());
    }

    @Test
    public void testWrongOutputReceiverType() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("OutputReceiver should be parameterized by java.lang.String");
        DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>(){

            @DoFn.ProcessElement
            public void process(DoFn.OutputReceiver<Integer> receiver) {
            }
        }.getClass());
    }

    @Test
    public void testRowParameterWithoutFieldAccess() {
        DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>(){

            @DoFn.ProcessElement
            public void process(@DoFn.Element Row row) {
            }
        }.getClass());
        Assert.assertThat((Object)sig.processElement().getRowParameter(), (Matcher)Matchers.notNullValue());
    }

    @Test
    public void testFieldAccess() throws IllegalAccessException {
        final FieldAccessDescriptor descriptor = FieldAccessDescriptor.withFieldNames((String[])new String[]{"foo", "bar"});
        DoFn<String, String> doFn = new DoFn<String, String>(){
            @DoFn.FieldAccess(value="foo")
            final FieldAccessDescriptor fieldAccess;
            {
                this.fieldAccess = descriptor;
            }

            @DoFn.ProcessElement
            public void process(@DoFn.FieldAccess(value="foo") Row row) {
            }
        };
        DoFnSignature sig = DoFnSignatures.getSignature(doFn.getClass());
        Assert.assertThat((Object)((DoFnSignature.FieldAccessDeclaration)sig.fieldAccessDeclarations().get("foo")), (Matcher)Matchers.notNullValue());
        Field field = ((DoFnSignature.FieldAccessDeclaration)sig.fieldAccessDeclarations().get("foo")).field();
        Assert.assertThat((Object)field.getName(), (Matcher)Matchers.equalTo((Object)"fieldAccess"));
        Assert.assertThat((Object)field.get(doFn), (Matcher)Matchers.equalTo((Object)descriptor));
        Assert.assertThat((Object)sig.processElement().getRowParameter(), (Matcher)Matchers.notNullValue());
    }

    @Test
    public void testMissingFieldAccess() {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("No FieldAccessDescriptor defined.");
        DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>(){

            @DoFn.ProcessElement
            public void process(@DoFn.FieldAccess(value="foo") Row row) {
            }
        }.getClass());
    }

    @Test
    public void testRowReceiver() {
        DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>(){

            @DoFn.ProcessElement
            public void process(DoFn.OutputReceiver<Row> rowReceiver) {
            }
        }.getClass());
        Assert.assertThat((Object)sig.processElement().getMainOutputReceiver().isRowReceiver(), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void testRequiresStableInputProcessElement() throws Exception {
        DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>(){

            @DoFn.ProcessElement
            @DoFn.RequiresStableInput
            public void process(DoFn.ProcessContext c) {
            }
        }.getClass());
        Assert.assertThat((Object)sig.processElement().requiresStableInput(), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void testBadExtraContext() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Must take a single argument of type DoFn<Integer, String>.StartBundleContext");
        DoFnSignatures.analyzeStartBundleMethod((DoFnSignatures.ErrorReporter)DoFnSignaturesTestUtils.errors(), (TypeDescriptor)TypeDescriptor.of(DoFnSignaturesTestUtils.FakeDoFn.class), (Method)new DoFnSignaturesTestUtils.AnonymousMethod(){

            void method(DoFn.StartBundleContext c, int n) {
            }
        }.getMethod(), (TypeDescriptor)TypeDescriptor.of(Integer.class), (TypeDescriptor)TypeDescriptor.of(String.class));
    }

    @Test
    public void testMultipleStartBundleElement() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Found multiple methods annotated with @StartBundle");
        this.thrown.expectMessage("bar()");
        this.thrown.expectMessage("baz()");
        this.thrown.expectMessage(this.getClass().getName() + "$");
        DoFnSignatures.getSignature(new DoFn<String, String>(){

            @DoFn.ProcessElement
            public void foo() {
            }

            @DoFn.StartBundle
            public void bar() {
            }

            @DoFn.StartBundle
            public void baz() {
            }
        }.getClass());
    }

    @Test
    public void testMultipleFinishBundleMethods() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Found multiple methods annotated with @FinishBundle");
        this.thrown.expectMessage("bar(FinishBundleContext)");
        this.thrown.expectMessage("baz(FinishBundleContext)");
        this.thrown.expectMessage(this.getClass().getName() + "$");
        DoFnSignatures.getSignature(new DoFn<String, String>(){

            @DoFn.ProcessElement
            public void foo(DoFn.ProcessContext context) {
            }

            @DoFn.FinishBundle
            public void bar(DoFn.FinishBundleContext context) {
            }

            @DoFn.FinishBundle
            public void baz(DoFn.FinishBundleContext context) {
            }
        }.getClass());
    }

    @Test
    public void testPrivateStartBundle() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("startBundle()");
        this.thrown.expectMessage("Must be public");
        this.thrown.expectMessage(this.getClass().getName() + "$");
        DoFnSignatures.getSignature(new DoFn<String, String>(){

            @DoFn.ProcessElement
            public void processElement() {
            }

            @DoFn.StartBundle
            void startBundle() {
            }
        }.getClass());
    }

    @Test
    public void testPrivateFinishBundle() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("finishBundle()");
        this.thrown.expectMessage("Must be public");
        this.thrown.expectMessage(this.getClass().getName() + "$");
        DoFnSignatures.getSignature(new DoFn<String, String>(){

            @DoFn.ProcessElement
            public void processElement() {
            }

            @DoFn.FinishBundle
            void finishBundle() {
            }
        }.getClass());
    }

    @Test
    public void testTimerIdWithWrongType() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("TimerId");
        this.thrown.expectMessage("TimerSpec");
        this.thrown.expectMessage("bizzle");
        this.thrown.expectMessage(Matchers.not(this.mentionsState()));
        DoFnSignatures.getSignature(new DoFn<String, String>(){
            @DoFn.TimerId(value="foo")
            private final String bizzle = "bazzle";

            @DoFn.ProcessElement
            public void foo(DoFn.ProcessContext context) {
            }
        }.getClass());
    }

    @Test
    public void testTimerIdNoCallback() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("No callback registered");
        this.thrown.expectMessage("my-id");
        this.thrown.expectMessage(Matchers.not(this.mentionsState()));
        this.thrown.expectMessage(this.mentionsTimers());
        DoFnSignatures.getSignature(new DoFn<KV<String, Integer>, Long>(){
            @DoFn.TimerId(value="my-id")
            private final TimerSpec myfield1 = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

            @DoFn.ProcessElement
            public void foo(DoFn.ProcessContext context) {
            }
        }.getClass());
    }

    @Test
    public void testOnTimerNoDeclaration() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Callback");
        this.thrown.expectMessage("undeclared timer");
        this.thrown.expectMessage("onFoo");
        this.thrown.expectMessage("my-id");
        this.thrown.expectMessage(Matchers.not(this.mentionsState()));
        this.thrown.expectMessage(this.mentionsTimers());
        DoFnSignatures.getSignature(new DoFn<KV<String, Integer>, Long>(){

            @DoFn.OnTimer(value="my-id")
            public void onFoo() {
            }

            @DoFn.ProcessElement
            public void foo(DoFn.ProcessContext context) {
            }
        }.getClass());
    }

    @Test
    public void testOnTimerDeclaredInSuperclass() throws Exception {
        class DoFnDeclaringTimerAndProcessElement
        extends DoFn<KV<String, Integer>, Long> {
            public static final String TIMER_ID = "my-timer-id";
            @DoFn.TimerId(value="my-timer-id")
            private final TimerSpec bizzle = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

            DoFnDeclaringTimerAndProcessElement() {
            }

            @DoFn.ProcessElement
            public void foo(DoFn.ProcessContext context) {
            }
        }
        DoFnDeclaringTimerAndProcessElement fn = new DoFnDeclaringTimerAndProcessElement(){
            {
            }

            @DoFn.OnTimer(value="my-timer-id")
            public void onTimerFoo() {
            }
        };
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Callback");
        this.thrown.expectMessage("declared in a different class");
        this.thrown.expectMessage("my-timer-id");
        this.thrown.expectMessage(((Object)((Object)fn)).getClass().getSimpleName());
        this.thrown.expectMessage(Matchers.not(this.mentionsState()));
        this.thrown.expectMessage(this.mentionsTimers());
        DoFnSignatures.getSignature(((Object)((Object)fn)).getClass());
    }

    @Test
    public void testUsageOfTimerDeclaredInSuperclass() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("process");
        this.thrown.expectMessage("declared in a different class");
        this.thrown.expectMessage("my-timer-id");
        this.thrown.expectMessage(Matchers.not(this.mentionsState()));
        this.thrown.expectMessage(this.mentionsTimers());
        DoFnSignatures.getSignature(((Object)((Object)new DoFnDeclaringTimerAndCallback(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext context, @DoFn.TimerId(value="my-timer-id") Timer timer) {
            }
        })).getClass());
    }

    @Test
    public void testTimerParameterDuplicate() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("duplicate");
        this.thrown.expectMessage("my-id");
        this.thrown.expectMessage("myProcessElement");
        this.thrown.expectMessage("index 2");
        this.thrown.expectMessage(Matchers.not(this.mentionsState()));
        DoFnSignatures.getSignature(new DoFn<KV<String, Integer>, Long>(){
            @DoFn.TimerId(value="my-id")
            private final TimerSpec myfield = TimerSpecs.timer((TimeDomain)TimeDomain.PROCESSING_TIME);

            @DoFn.ProcessElement
            public void myProcessElement(DoFn.ProcessContext context, @DoFn.TimerId(value="my-id") Timer one, @DoFn.TimerId(value="my-id") Timer two) {
            }

            @DoFn.OnTimer(value="my-id")
            public void onWhatever() {
            }
        }.getClass());
    }

    @Test
    public void testOnTimerDeclaredInSubclass() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Callback");
        this.thrown.expectMessage("declared in a different class");
        this.thrown.expectMessage("my-timer-id");
        this.thrown.expectMessage(Matchers.not(this.mentionsState()));
        this.thrown.expectMessage(this.mentionsTimers());
        DoFnSignatures.getSignature(((Object)((Object)new DoFnWithOnlyCallback(){
            @DoFn.TimerId(value="my-timer-id")
            private final TimerSpec myfield1 = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

            @Override
            @DoFn.ProcessElement
            public void foo(DoFn.ProcessContext context) {
            }
        })).getClass());
    }

    @Test
    public void testWindowParamOnTimer() throws Exception {
        String timerId = "some-timer-id";
        DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>(){
            @DoFn.TimerId(value="some-timer-id")
            private final TimerSpec myfield1 = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext c) {
            }

            @DoFn.OnTimer(value="some-timer-id")
            public void onTimer(BoundedWindow w) {
            }
        }.getClass());
        Assert.assertThat((Object)((DoFnSignature.OnTimerMethod)sig.onTimerMethods().get("some-timer-id")).extraParameters().size(), (Matcher)Matchers.equalTo((Object)1));
        Assert.assertThat((Object)((DoFnSignature.Parameter)((DoFnSignature.OnTimerMethod)sig.onTimerMethods().get("some-timer-id")).extraParameters().get(0)), (Matcher)Matchers.instanceOf(DoFnSignature.Parameter.WindowParameter.class));
    }

    @Test
    public void testAllParamsOnTimer() throws Exception {
        String timerId = "some-timer-id";
        DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>(){
            @DoFn.TimerId(value="some-timer-id")
            private final TimerSpec myfield1 = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext c) {
            }

            @DoFn.OnTimer(value="some-timer-id")
            public void onTimer(@DoFn.Timestamp Instant timestamp, TimeDomain timeDomain, BoundedWindow w) {
            }
        }.getClass());
        Assert.assertThat((Object)((DoFnSignature.OnTimerMethod)sig.onTimerMethods().get("some-timer-id")).extraParameters().size(), (Matcher)Matchers.equalTo((Object)3));
        Assert.assertThat((Object)((DoFnSignature.Parameter)((DoFnSignature.OnTimerMethod)sig.onTimerMethods().get("some-timer-id")).extraParameters().get(0)), (Matcher)Matchers.instanceOf(DoFnSignature.Parameter.TimestampParameter.class));
        Assert.assertThat((Object)((DoFnSignature.Parameter)((DoFnSignature.OnTimerMethod)sig.onTimerMethods().get("some-timer-id")).extraParameters().get(1)), (Matcher)Matchers.instanceOf(DoFnSignature.Parameter.TimeDomainParameter.class));
        Assert.assertThat((Object)((DoFnSignature.Parameter)((DoFnSignature.OnTimerMethod)sig.onTimerMethods().get("some-timer-id")).extraParameters().get(2)), (Matcher)Matchers.instanceOf(DoFnSignature.Parameter.WindowParameter.class));
    }

    @Test
    public void testPipelineOptionsParameter() throws Exception {
        DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext c, PipelineOptions options) {
            }
        }.getClass());
        Assert.assertThat((Object)sig.processElement().extraParameters(), (Matcher)Matchers.hasItem((Matcher)Matchers.instanceOf(DoFnSignature.Parameter.PipelineOptionsParameter.class)));
    }

    @Test
    public void testDeclAndUsageOfTimerInSuperclass() throws Exception {
        DoFnSignature sig = DoFnSignatures.getSignature(((Object)((Object)new DoFnOverridingAbstractTimerUse())).getClass());
        Assert.assertThat((Object)sig.timerDeclarations().size(), (Matcher)Matchers.equalTo((Object)1));
        Assert.assertThat((Object)sig.processElement().extraParameters().size(), (Matcher)Matchers.equalTo((Object)2));
        DoFnSignature.TimerDeclaration decl = (DoFnSignature.TimerDeclaration)sig.timerDeclarations().get("my-timer-id");
        DoFnSignature.Parameter.TimerParameter timerParam = (DoFnSignature.Parameter.TimerParameter)sig.processElement().extraParameters().get(1);
        Assert.assertThat((Object)decl.field(), (Matcher)Matchers.equalTo((Object)DoFnDeclaringTimerAndAbstractUse.class.getDeclaredField("myTimerSpec")));
        Assert.assertThat((Object)timerParam.referent(), (Matcher)Matchers.equalTo((Object)decl));
    }

    @Test
    public void testOnTimerDeclaredAndUsedInSuperclass() throws Exception {
        DoFnSignature sig = DoFnSignatures.getSignature(((Object)((Object)new DoFnOverridingAbstractCallback())).getClass());
        Assert.assertThat((Object)sig.timerDeclarations().size(), (Matcher)Matchers.equalTo((Object)1));
        Assert.assertThat((Object)sig.onTimerMethods().size(), (Matcher)Matchers.equalTo((Object)1));
        DoFnSignature.TimerDeclaration decl = (DoFnSignature.TimerDeclaration)sig.timerDeclarations().get("my-timer-id");
        DoFnSignature.OnTimerMethod callback = (DoFnSignature.OnTimerMethod)sig.onTimerMethods().get("my-timer-id");
        Assert.assertThat((Object)decl.field(), (Matcher)Matchers.equalTo((Object)DoFnDeclaringTimerAndAbstractCallback.class.getDeclaredField("myTimerSpec")));
        Assert.assertThat((Object)callback.targetMethod(), (Matcher)Matchers.equalTo((Object)DoFnDeclaringTimerAndAbstractCallback.class.getDeclaredMethod("onMyTimer", new Class[0])));
    }

    @Test
    public void testTimerIdDuplicate() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Duplicate");
        this.thrown.expectMessage("TimerId");
        this.thrown.expectMessage("my-id");
        this.thrown.expectMessage("myfield1");
        this.thrown.expectMessage("myfield2");
        this.thrown.expectMessage(Matchers.not((Matcher)Matchers.containsString((String)"State")));
        this.thrown.expectMessage(this.mentionsTimers());
        DoFnSignatures.getSignature(new DoFn<KV<String, Integer>, Long>(){
            @DoFn.TimerId(value="my-id")
            private final TimerSpec myfield1 = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);
            @DoFn.TimerId(value="my-id")
            private final TimerSpec myfield2 = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

            @DoFn.ProcessElement
            public void foo(DoFn.ProcessContext context) {
            }
        }.getClass());
    }

    @Test
    public void testTimerIdNonFinal() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Timer declarations must be final");
        this.thrown.expectMessage("Non-final field");
        this.thrown.expectMessage("myfield");
        this.thrown.expectMessage(Matchers.not((Matcher)Matchers.containsString((String)"State")));
        this.thrown.expectMessage(this.mentionsTimers());
        DoFnSignatures.getSignature(new DoFn<KV<String, Integer>, Long>(){
            @DoFn.TimerId(value="my-timer-id")
            private TimerSpec myfield = TimerSpecs.timer((TimeDomain)TimeDomain.PROCESSING_TIME);

            @DoFn.ProcessElement
            public void foo(DoFn.ProcessContext context) {
            }
        }.getClass());
    }

    @Test
    public void testSimpleTimerIdAnonymousDoFn() throws Exception {
        DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<KV<String, Integer>, Long>(){
            @DoFn.TimerId(value="foo")
            private final TimerSpec bizzle = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

            @DoFn.ProcessElement
            public void foo(DoFn.ProcessContext context) {
            }

            @DoFn.OnTimer(value="foo")
            public void onFoo() {
            }
        }.getClass());
        Assert.assertThat((Object)sig.timerDeclarations().size(), (Matcher)Matchers.equalTo((Object)1));
        DoFnSignature.TimerDeclaration decl = (DoFnSignature.TimerDeclaration)sig.timerDeclarations().get("foo");
        Assert.assertThat((Object)decl.id(), (Matcher)Matchers.equalTo((Object)"foo"));
        Assert.assertThat((Object)decl.field().getName(), (Matcher)Matchers.equalTo((Object)"bizzle"));
    }

    @Test
    public void testSimpleTimerWithContext() throws Exception {
        DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<KV<String, Integer>, Long>(){
            @DoFn.TimerId(value="foo")
            private final TimerSpec bizzle = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

            @DoFn.ProcessElement
            public void foo(DoFn.ProcessContext context) {
            }

            @DoFn.OnTimer(value="foo")
            public void onFoo(DoFn.OnTimerContext c) {
            }
        }.getClass());
        Assert.assertThat((Object)sig.timerDeclarations().size(), (Matcher)Matchers.equalTo((Object)1));
        DoFnSignature.TimerDeclaration decl = (DoFnSignature.TimerDeclaration)sig.timerDeclarations().get("foo");
        Assert.assertThat((Object)decl.id(), (Matcher)Matchers.equalTo((Object)"foo"));
        Assert.assertThat((Object)decl.field().getName(), (Matcher)Matchers.equalTo((Object)"bizzle"));
        Assert.assertThat((Object)((DoFnSignature.Parameter)((DoFnSignature.OnTimerMethod)sig.onTimerMethods().get("foo")).extraParameters().get(0)), (Matcher)Matchers.equalTo((Object)DoFnSignature.Parameter.onTimerContext()));
    }

    @Test
    public void testProcessElementWithOnTimerContextRejected() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("@" + DoFn.ProcessElement.class.getSimpleName());
        this.thrown.expectMessage(DoFn.OnTimerContext.class.getSimpleName());
        DoFnSignatures.getSignature(new DoFn<KV<String, Integer>, Long>(){
            @DoFn.TimerId(value="foo")
            private final TimerSpec bizzle = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

            @DoFn.ProcessElement
            public void foo(DoFn.ProcessContext context, DoFn.OnTimerContext bogus) {
            }

            @DoFn.OnTimer(value="foo")
            public void onFoo() {
            }
        }.getClass());
    }

    @Test
    public void testSimpleTimerIdNamedDoFn() throws Exception {
        class DoFnForTestSimpleTimerIdNamedDoFn
        extends DoFn<KV<String, Integer>, Long> {
            @DoFn.TimerId(value="foo")
            private final TimerSpec bizzle = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

            DoFnForTestSimpleTimerIdNamedDoFn() {
            }

            @DoFn.ProcessElement
            public void foo(DoFn.ProcessContext context) {
            }

            @DoFn.OnTimer(value="foo")
            public void onFoo() {
            }
        }
        DoFnSignature sig = DoFnSignatures.signatureForDoFn((DoFn)new DoFnForTestSimpleTimerIdNamedDoFn());
        Assert.assertThat((Object)sig.timerDeclarations().size(), (Matcher)Matchers.equalTo((Object)1));
        DoFnSignature.TimerDeclaration decl = (DoFnSignature.TimerDeclaration)sig.timerDeclarations().get("foo");
        Assert.assertThat((Object)decl.id(), (Matcher)Matchers.equalTo((Object)"foo"));
        Assert.assertThat((Object)decl.field(), (Matcher)Matchers.equalTo((Object)DoFnForTestSimpleTimerIdNamedDoFn.class.getDeclaredField("bizzle")));
    }

    @Test
    public void testStateIdWithWrongType() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("StateId");
        this.thrown.expectMessage("StateSpec");
        this.thrown.expectMessage(Matchers.not(this.mentionsTimers()));
        DoFnSignatures.getSignature(new DoFn<String, String>(){
            @DoFn.StateId(value="foo")
            private final String bizzle = "bazzle";

            @DoFn.ProcessElement
            public void foo(DoFn.ProcessContext context) {
            }
        }.getClass());
    }

    @Test
    public void testStateIdDuplicate() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Duplicate");
        this.thrown.expectMessage("StateId");
        this.thrown.expectMessage("my-id");
        this.thrown.expectMessage("myfield1");
        this.thrown.expectMessage("myfield2");
        this.thrown.expectMessage(Matchers.not(this.mentionsTimers()));
        DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<KV<String, Integer>, Long>(){
            @DoFn.StateId(value="my-id")
            private final StateSpec<ValueState<Integer>> myfield1 = StateSpecs.value((Coder)VarIntCoder.of());
            @DoFn.StateId(value="my-id")
            private final StateSpec<ValueState<Long>> myfield2 = StateSpecs.value((Coder)VarLongCoder.of());

            @DoFn.ProcessElement
            public void foo(DoFn.ProcessContext context) {
            }
        }.getClass());
    }

    @Test
    public void testStateIdNonFinal() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("State declarations must be final");
        this.thrown.expectMessage("Non-final field");
        this.thrown.expectMessage("myfield");
        this.thrown.expectMessage(Matchers.not(this.mentionsTimers()));
        DoFnSignatures.getSignature(new DoFn<KV<String, Integer>, Long>(){
            @DoFn.StateId(value="my-id")
            private StateSpec<ValueState<Integer>> myfield = StateSpecs.value((Coder)VarIntCoder.of());

            @DoFn.ProcessElement
            public void foo(DoFn.ProcessContext context) {
            }
        }.getClass());
    }

    @Test
    public void testStateParameterNoAnnotation() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("missing StateId annotation");
        this.thrown.expectMessage("myProcessElement");
        this.thrown.expectMessage("index 1");
        this.thrown.expectMessage(Matchers.not(this.mentionsTimers()));
        DoFnSignatures.getSignature(new DoFn<KV<String, Integer>, Long>(){

            @DoFn.ProcessElement
            public void myProcessElement(DoFn.ProcessContext context, ValueState<Integer> noAnnotation) {
            }
        }.getClass());
    }

    @Test
    public void testStateParameterUndeclared() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("undeclared");
        this.thrown.expectMessage("my-id");
        this.thrown.expectMessage("myProcessElement");
        this.thrown.expectMessage("index 1");
        this.thrown.expectMessage(Matchers.not(this.mentionsTimers()));
        DoFnSignatures.getSignature(new DoFn<KV<String, Integer>, Long>(){

            @DoFn.ProcessElement
            public void myProcessElement(DoFn.ProcessContext context, @DoFn.StateId(value="my-id") ValueState<Integer> undeclared) {
            }
        }.getClass());
    }

    @Test
    public void testStateParameterDuplicate() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("duplicate");
        this.thrown.expectMessage("my-id");
        this.thrown.expectMessage("myProcessElement");
        this.thrown.expectMessage("index 2");
        this.thrown.expectMessage(Matchers.not(this.mentionsTimers()));
        DoFnSignatures.getSignature(new DoFn<KV<String, Integer>, Long>(){
            @DoFn.StateId(value="my-id")
            private final StateSpec<ValueState<Integer>> myfield = StateSpecs.value((Coder)VarIntCoder.of());

            @DoFn.ProcessElement
            public void myProcessElement(DoFn.ProcessContext context, @DoFn.StateId(value="my-id") ValueState<Integer> one, @DoFn.StateId(value="my-id") ValueState<Integer> two) {
            }
        }.getClass());
    }

    @Test
    public void testStateParameterWrongStateType() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("WatermarkHoldState");
        this.thrown.expectMessage("reference to");
        this.thrown.expectMessage("supertype");
        this.thrown.expectMessage("ValueState");
        this.thrown.expectMessage("my-id");
        this.thrown.expectMessage("myProcessElement");
        this.thrown.expectMessage("index 1");
        this.thrown.expectMessage(Matchers.not(this.mentionsTimers()));
        DoFnSignatures.getSignature(new DoFn<KV<String, Integer>, Long>(){
            @DoFn.StateId(value="my-id")
            private final StateSpec<ValueState<Integer>> myfield = StateSpecs.value((Coder)VarIntCoder.of());

            @DoFn.ProcessElement
            public void myProcessElement(DoFn.ProcessContext context, @DoFn.StateId(value="my-id") WatermarkHoldState watermark) {
            }
        }.getClass());
    }

    @Test
    public void testStateParameterWrongGenericType() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("ValueState<String>");
        this.thrown.expectMessage("reference to");
        this.thrown.expectMessage("supertype");
        this.thrown.expectMessage("ValueState<Integer>");
        this.thrown.expectMessage("my-id");
        this.thrown.expectMessage("myProcessElement");
        this.thrown.expectMessage("index 1");
        this.thrown.expectMessage(Matchers.not(this.mentionsTimers()));
        DoFnSignatures.getSignature(new DoFn<KV<String, Integer>, Long>(){
            @DoFn.StateId(value="my-id")
            private final StateSpec<ValueState<Integer>> myfield = StateSpecs.value((Coder)VarIntCoder.of());

            @DoFn.ProcessElement
            public void myProcessElement(DoFn.ProcessContext context, @DoFn.StateId(value="my-id") ValueState<String> stringState) {
            }
        }.getClass());
    }

    @Test
    public void testGoodStateParameterSuperclassStateType() throws Exception {
        DoFnSignatures.getSignature(new DoFn<KV<String, Integer>, Long>(){
            @DoFn.StateId(value="my-id")
            private final StateSpec<CombiningState<Integer, int[], Integer>> state = StateSpecs.combining((Combine.CombineFn)Sum.ofIntegers());

            @DoFn.ProcessElement
            public void myProcessElement(DoFn.ProcessContext context, @DoFn.StateId(value="my-id") GroupingState<Integer, Integer> groupingState) {
            }
        }.getClass());
    }

    @Test
    public void testSimpleStateIdAnonymousDoFn() throws Exception {
        DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<KV<String, Integer>, Long>(){
            @DoFn.StateId(value="foo")
            private final StateSpec<ValueState<Integer>> bizzle = StateSpecs.value((Coder)VarIntCoder.of());

            @DoFn.ProcessElement
            public void foo(DoFn.ProcessContext context) {
            }
        }.getClass());
        Assert.assertThat((Object)sig.stateDeclarations().size(), (Matcher)Matchers.equalTo((Object)1));
        DoFnSignature.StateDeclaration decl = (DoFnSignature.StateDeclaration)sig.stateDeclarations().get("foo");
        Assert.assertThat((Object)decl.id(), (Matcher)Matchers.equalTo((Object)"foo"));
        Assert.assertThat((Object)decl.field().getName(), (Matcher)Matchers.equalTo((Object)"bizzle"));
        Assert.assertThat((Object)decl.stateType(), (Matcher)Matchers.equalTo((Object)new TypeDescriptor<ValueState<Integer>>(){}));
    }

    @Test
    public void testUsageOfStateDeclaredInSuperclass() throws Exception {
        DoFnDeclaringState fn = new DoFnDeclaringState(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext context, @DoFn.StateId(value="my-state-id") ValueState<Integer> state) {
            }
        };
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("process");
        this.thrown.expectMessage("declared in a different class");
        this.thrown.expectMessage("my-state-id");
        this.thrown.expectMessage(((Object)((Object)fn)).getClass().getSimpleName());
        DoFnSignatures.getSignature(((Object)((Object)fn)).getClass());
    }

    @Test
    public void testDeclOfStateUsedInSuperclass() throws Exception {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("process");
        this.thrown.expectMessage("declared in a different class");
        this.thrown.expectMessage("my-state-id");
        DoFnSignatures.getSignature(((Object)((Object)new DoFnUsingState(){
            @DoFn.StateId(value="my-state-id")
            private final StateSpec<ValueState<Integer>> spec = StateSpecs.value((Coder)VarIntCoder.of());
        })).getClass());
    }

    @Test
    public void testDeclAndUsageOfStateInSuperclass() throws Exception {
        class DoFnOverridingAbstractStateUse
        extends DoFnDeclaringStateAndAbstractUse {
            DoFnOverridingAbstractStateUse() {
            }

            @Override
            public void processWithState(DoFn.ProcessContext c, ValueState<String> state) {
            }
        }
        DoFnSignature sig = DoFnSignatures.getSignature(((Object)((Object)new DoFnOverridingAbstractStateUse())).getClass());
        Assert.assertThat((Object)sig.stateDeclarations().size(), (Matcher)Matchers.equalTo((Object)1));
        Assert.assertThat((Object)sig.processElement().extraParameters().size(), (Matcher)Matchers.equalTo((Object)2));
        DoFnSignature.StateDeclaration decl = (DoFnSignature.StateDeclaration)sig.stateDeclarations().get("my-state-id");
        DoFnSignature.Parameter.StateParameter stateParam = (DoFnSignature.Parameter.StateParameter)sig.processElement().extraParameters().get(1);
        Assert.assertThat((Object)decl.field(), (Matcher)Matchers.equalTo((Object)DoFnDeclaringStateAndAbstractUse.class.getDeclaredField("myStateSpec")));
        Assert.assertThat((Object)stateParam.referent(), (Matcher)Matchers.equalTo((Object)decl));
    }

    @Test
    public void testSimpleStateIdRefAnonymousDoFn() throws Exception {
        DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<KV<String, Integer>, Long>(){
            @DoFn.StateId(value="foo")
            private final StateSpec<ValueState<Integer>> bizzleDecl = StateSpecs.value((Coder)VarIntCoder.of());

            @DoFn.ProcessElement
            public void foo(DoFn.ProcessContext context, @DoFn.StateId(value="foo") ValueState<Integer> bizzle) {
            }
        }.getClass());
        Assert.assertThat((Object)sig.processElement().extraParameters().size(), (Matcher)Matchers.equalTo((Object)2));
        final DoFnSignature.StateDeclaration decl = (DoFnSignature.StateDeclaration)sig.stateDeclarations().get("foo");
        ((DoFnSignature.Parameter)sig.processElement().extraParameters().get(1)).match((DoFnSignature.Parameter.Cases)new DoFnSignature.Parameter.Cases.WithDefault<Void>(){

            protected Void dispatchDefault(DoFnSignature.Parameter p) {
                Assert.fail((String)String.format("Expected a state parameter but got %s", p));
                return null;
            }

            public Void dispatch(DoFnSignature.Parameter.StateParameter stateParam) {
                Assert.assertThat((Object)stateParam.referent(), (Matcher)Matchers.equalTo((Object)decl));
                return null;
            }
        });
    }

    @Test
    public void testSimpleStateIdNamedDoFn() throws Exception {
        class DoFnForTestSimpleStateIdNamedDoFn
        extends DoFn<KV<String, Integer>, Long> {
            @DoFn.StateId(value="foo")
            private final StateSpec<ValueState<Integer>> bizzle = StateSpecs.value((Coder)VarIntCoder.of());

            DoFnForTestSimpleStateIdNamedDoFn() {
            }

            @DoFn.ProcessElement
            public void foo(DoFn.ProcessContext context) {
            }
        }
        DoFnSignature sig = DoFnSignatures.signatureForDoFn((DoFn)new DoFnForTestSimpleStateIdNamedDoFn());
        Assert.assertThat((Object)sig.stateDeclarations().size(), (Matcher)Matchers.equalTo((Object)1));
        DoFnSignature.StateDeclaration decl = (DoFnSignature.StateDeclaration)sig.stateDeclarations().get("foo");
        Assert.assertThat((Object)decl.id(), (Matcher)Matchers.equalTo((Object)"foo"));
        Assert.assertThat((Object)decl.field(), (Matcher)Matchers.equalTo((Object)DoFnForTestSimpleStateIdNamedDoFn.class.getDeclaredField("bizzle")));
        Assert.assertThat((Object)decl.stateType(), (Matcher)Matchers.equalTo((Object)new TypeDescriptor<ValueState<Integer>>(){}));
    }

    @Test
    public void testGenericStatefulDoFn() throws Exception {
        DoFnForTestGenericStatefulDoFn<Integer> myDoFn = new DoFnForTestGenericStatefulDoFn<Integer>(){
            {
                class DoFnForTestGenericStatefulDoFn<T>
                extends DoFn<KV<String, T>, Long> {
                    @DoFn.StateId(value="foo")
                    private final StateSpec<ValueState<T>> bizzle = null;

                    DoFnForTestGenericStatefulDoFn() {
                    }

                    @DoFn.ProcessElement
                    public void foo(DoFn.ProcessContext context) {
                    }
                }
            }
        };
        DoFnSignature sig = DoFnSignatures.signatureForDoFn((DoFn)myDoFn);
        Assert.assertThat((Object)sig.stateDeclarations().size(), (Matcher)Matchers.equalTo((Object)1));
        DoFnSignature.StateDeclaration decl = (DoFnSignature.StateDeclaration)sig.stateDeclarations().get("foo");
        Assert.assertThat((Object)decl.id(), (Matcher)Matchers.equalTo((Object)"foo"));
        Assert.assertThat((Object)decl.field(), (Matcher)Matchers.equalTo((Object)DoFnForTestGenericStatefulDoFn.class.getDeclaredField("bizzle")));
        Assert.assertThat((Object)decl.stateType(), (Matcher)Matchers.equalTo((Object)new TypeDescriptor<ValueState<Integer>>(){}));
    }

    private Matcher<String> mentionsTimers() {
        return Matchers.anyOf((Matcher)Matchers.containsString((String)"timer"), (Matcher)Matchers.containsString((String)"Timer"));
    }

    private Matcher<String> mentionsState() {
        return Matchers.anyOf((Matcher)Matchers.containsString((String)"state"), (Matcher)Matchers.containsString((String)"State"));
    }

    private static class DoFnOverridingAbstractTimerUse
    extends DoFnDeclaringTimerAndAbstractUse {
        private DoFnOverridingAbstractTimerUse() {
        }

        @Override
        public void onMyTimer() {
        }

        @Override
        public void processWithTimer(DoFn.ProcessContext context, Timer timer) {
        }
    }

    private static abstract class DoFnDeclaringTimerAndAbstractUse
    extends DoFn<KV<String, Integer>, Long> {
        public static final String TIMER_ID = "my-timer-id";
        @DoFn.TimerId(value="my-timer-id")
        private final TimerSpec myTimerSpec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

        private DoFnDeclaringTimerAndAbstractUse() {
        }

        @DoFn.ProcessElement
        public abstract void processWithTimer(DoFn.ProcessContext var1, @DoFn.TimerId(value="my-timer-id") Timer var2);

        @DoFn.OnTimer(value="my-timer-id")
        public abstract void onMyTimer();
    }

    private static class DoFnOverridingAbstractCallback
    extends DoFnDeclaringTimerAndAbstractCallback {
        private DoFnOverridingAbstractCallback() {
        }

        @Override
        public void onMyTimer() {
        }

        @Override
        @DoFn.ProcessElement
        public void foo(DoFn.ProcessContext context) {
        }
    }

    private static abstract class DoFnDeclaringTimerAndAbstractCallback
    extends DoFn<KV<String, Integer>, Long> {
        public static final String TIMER_ID = "my-timer-id";
        @DoFn.TimerId(value="my-timer-id")
        private final TimerSpec myTimerSpec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

        private DoFnDeclaringTimerAndAbstractCallback() {
        }

        @DoFn.ProcessElement
        public void foo(DoFn.ProcessContext context) {
        }

        @DoFn.OnTimer(value="my-timer-id")
        public abstract void onMyTimer();
    }

    private static abstract class DoFnWithOnlyCallback
    extends DoFn<KV<String, Integer>, Long> {
        public static final String TIMER_ID = "my-timer-id";

        private DoFnWithOnlyCallback() {
        }

        @DoFn.OnTimer(value="my-timer-id")
        public void onMyTimer() {
        }

        @DoFn.ProcessElement
        public void foo(DoFn.ProcessContext context) {
        }
    }

    private static abstract class DoFnDeclaringTimerAndCallback
    extends DoFn<KV<String, Integer>, Long> {
        public static final String TIMER_ID = "my-timer-id";
        @DoFn.TimerId(value="my-timer-id")
        private final TimerSpec bizzle = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

        private DoFnDeclaringTimerAndCallback() {
        }

        @DoFn.OnTimer(value="my-timer-id")
        public void onTimer() {
        }
    }

    private static abstract class DoFnDeclaringMyTimerId
    extends DoFn<KV<String, Integer>, Long> {
        @DoFn.TimerId(value="my-timer-id")
        private final TimerSpec bizzle = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

        private DoFnDeclaringMyTimerId() {
        }

        @DoFn.ProcessElement
        public void foo(DoFn.ProcessContext context) {
        }
    }

    private static abstract class DoFnDeclaringStateAndAbstractUse
    extends DoFn<KV<String, Integer>, Long> {
        public static final String STATE_ID = "my-state-id";
        @DoFn.StateId(value="my-state-id")
        private final StateSpec<ValueState<String>> myStateSpec = StateSpecs.value((Coder)StringUtf8Coder.of());

        private DoFnDeclaringStateAndAbstractUse() {
        }

        @DoFn.ProcessElement
        public abstract void processWithState(DoFn.ProcessContext var1, @DoFn.StateId(value="my-state-id") ValueState<String> var2);
    }

    private static abstract class DoFnUsingState
    extends DoFn<KV<String, Integer>, Long> {
        public static final String STATE_ID = "my-state-id";

        private DoFnUsingState() {
        }

        @DoFn.ProcessElement
        public void process(DoFn.ProcessContext context, @DoFn.StateId(value="my-state-id") ValueState<Integer> state) {
        }
    }

    private static abstract class DoFnDeclaringState
    extends DoFn<KV<String, Integer>, Long> {
        public static final String STATE_ID = "my-state-id";
        @DoFn.StateId(value="my-state-id")
        private final StateSpec<ValueState<Integer>> bizzle = StateSpecs.value((Coder)VarIntCoder.of());

        private DoFnDeclaringState() {
        }
    }
}

