package org.apache.beam.repackaged.beam_runners_direct_java.runners.core;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.DoFnRunners;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.TimerInternals;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ListMultimap;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.format.PeriodFormat;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/core/SimpleDoFnRunnerTest.class */
public class SimpleDoFnRunnerTest {

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Mock
    StepContext mockStepContext;

    @Mock
    TimerInternals mockTimerInternals;

    /* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/core/SimpleDoFnRunnerTest$DoFnWithTimers.class */
    private static class DoFnWithTimers<W extends BoundedWindow> extends DoFn<String, String> {
        static final String TIMER_ID = "testTimerId";
        private final Coder<W> windowCoder;
        List<TimerInternals.TimerData> onTimerInvocations = new ArrayList();
        static final Duration TIMER_OFFSET = Duration.millis(100);

        @DoFn.TimerId(TIMER_ID)
        private static final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

        DoFnWithTimers(Coder<W> coder) {
            this.windowCoder = coder;
        }

        @DoFn.ProcessElement
        public void process(DoFn<String, String>.ProcessContext processContext, @DoFn.TimerId("testTimerId") Timer timer2) {
            timer2.offset(TIMER_OFFSET).setRelative();
        }

        @DoFn.OnTimer(TIMER_ID)
        public void onTimer(DoFn<String, String>.OnTimerContext onTimerContext) {
            this.onTimerInvocations.add(TimerInternals.TimerData.of(TIMER_ID, StateNamespaces.window(this.windowCoder, onTimerContext.window()), onTimerContext.timestamp(), onTimerContext.timeDomain()));
        }
    }

    /* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/core/SimpleDoFnRunnerTest$ListOutputManager.class */
    private static class ListOutputManager implements DoFnRunners.OutputManager {
        private ListMultimap<TupleTag<?>, WindowedValue<?>> outputs;

        private ListOutputManager() {
            this.outputs = ArrayListMultimap.create();
        }

        @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.core.DoFnRunners.OutputManager
        public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            this.outputs.put(tupleTag, windowedValue);
        }
    }

    /* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/core/SimpleDoFnRunnerTest$SkewingDoFn.class */
    private static class SkewingDoFn extends DoFn<Duration, Duration> {
        private final Duration allowedSkew;

        private SkewingDoFn(Duration duration) {
            this.allowedSkew = duration;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Duration, Duration>.ProcessContext processContext) {
            processContext.outputWithTimestamp((Duration) processContext.element(), processContext.timestamp().minus((ReadableDuration) processContext.element()));
        }

        public Duration getAllowedTimestampSkew() {
            return this.allowedSkew;
        }
    }

    /* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/core/SimpleDoFnRunnerTest$ThrowingDoFn.class */
    static class ThrowingDoFn extends DoFn<String, String> {
        final Exception exceptionToThrow = new UnsupportedOperationException("Expected exception");
        static final String TIMER_ID = "throwingTimerId";

        @DoFn.TimerId(TIMER_ID)
        private static final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

        ThrowingDoFn() {
        }

        @DoFn.StartBundle
        public void startBundle() throws Exception {
            throw this.exceptionToThrow;
        }

        @DoFn.FinishBundle
        public void finishBundle() throws Exception {
            throw this.exceptionToThrow;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<String, String>.ProcessContext processContext) throws Exception {
            throw this.exceptionToThrow;
        }

        @DoFn.OnTimer(TIMER_ID)
        public void onTimer(DoFn<String, String>.OnTimerContext onTimerContext) throws Exception {
            throw this.exceptionToThrow;
        }
    }

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        Mockito.when(this.mockStepContext.timerInternals()).thenReturn(this.mockTimerInternals);
    }

    @Test
    public void testProcessElementExceptionsWrappedAsUserCodeException() {
        ThrowingDoFn throwingDoFn = new ThrowingDoFn();
        SimpleDoFnRunner simpleDoFnRunner = new SimpleDoFnRunner(null, throwingDoFn, NullSideInputReader.empty(), null, null, Collections.emptyList(), this.mockStepContext, null, Collections.emptyMap(), WindowingStrategy.of(new GlobalWindows()), DoFnSchemaInformation.create());
        this.thrown.expect(UserCodeException.class);
        this.thrown.expectCause(Matchers.is(throwingDoFn.exceptionToThrow));
        simpleDoFnRunner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
    }

    @Test
    public void testOnTimerExceptionsWrappedAsUserCodeException() {
        ThrowingDoFn throwingDoFn = new ThrowingDoFn();
        SimpleDoFnRunner simpleDoFnRunner = new SimpleDoFnRunner(null, throwingDoFn, NullSideInputReader.empty(), null, null, Collections.emptyList(), this.mockStepContext, null, Collections.emptyMap(), WindowingStrategy.of(new GlobalWindows()), DoFnSchemaInformation.create());
        this.thrown.expect(UserCodeException.class);
        this.thrown.expectCause(Matchers.is(throwingDoFn.exceptionToThrow));
        simpleDoFnRunner.onTimer("throwingTimerId", GlobalWindow.INSTANCE, new Instant(0L), TimeDomain.EVENT_TIME);
    }

    @Test
    public void testTimerSet() {
        SimpleDoFnRunner simpleDoFnRunner = new SimpleDoFnRunner(null, new DoFnWithTimers(new GlobalWindows().windowCoder()), NullSideInputReader.empty(), null, null, Collections.emptyList(), this.mockStepContext, null, Collections.emptyMap(), WindowingStrategy.of(new GlobalWindows()), DoFnSchemaInformation.create());
        Instant instant = new Instant(42L);
        Mockito.when(this.mockTimerInternals.currentInputWatermarkTime()).thenReturn(instant);
        simpleDoFnRunner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
        ((TimerInternals) Mockito.verify(this.mockTimerInternals)).setTimer(StateNamespaces.window(new GlobalWindows().windowCoder(), GlobalWindow.INSTANCE), "testTimerId", instant.plus(DoFnWithTimers.TIMER_OFFSET), TimeDomain.EVENT_TIME);
    }

    @Test
    public void testStartBundleExceptionsWrappedAsUserCodeException() {
        ThrowingDoFn throwingDoFn = new ThrowingDoFn();
        SimpleDoFnRunner simpleDoFnRunner = new SimpleDoFnRunner(null, throwingDoFn, NullSideInputReader.empty(), null, null, Collections.emptyList(), this.mockStepContext, null, Collections.emptyMap(), WindowingStrategy.of(new GlobalWindows()), DoFnSchemaInformation.create());
        this.thrown.expect(UserCodeException.class);
        this.thrown.expectCause(Matchers.is(throwingDoFn.exceptionToThrow));
        simpleDoFnRunner.startBundle();
    }

    @Test
    public void testFinishBundleExceptionsWrappedAsUserCodeException() {
        ThrowingDoFn throwingDoFn = new ThrowingDoFn();
        SimpleDoFnRunner simpleDoFnRunner = new SimpleDoFnRunner(null, throwingDoFn, NullSideInputReader.empty(), null, null, Collections.emptyList(), this.mockStepContext, null, Collections.emptyMap(), WindowingStrategy.of(new GlobalWindows()), DoFnSchemaInformation.create());
        this.thrown.expect(UserCodeException.class);
        this.thrown.expectCause(Matchers.is(throwingDoFn.exceptionToThrow));
        simpleDoFnRunner.finishBundle();
    }

    @Test
    public void testOnTimerCalled() {
        GlobalWindows globalWindows = new GlobalWindows();
        DoFnWithTimers doFnWithTimers = new DoFnWithTimers(globalWindows.windowCoder());
        SimpleDoFnRunner simpleDoFnRunner = new SimpleDoFnRunner(null, doFnWithTimers, NullSideInputReader.empty(), null, null, Collections.emptyList(), this.mockStepContext, null, Collections.emptyMap(), WindowingStrategy.of(globalWindows), DoFnSchemaInformation.create());
        Instant instant = new Instant(42L);
        Duration millis = Duration.millis(37L);
        simpleDoFnRunner.onTimer("testTimerId", GlobalWindow.INSTANCE, instant.plus(millis), TimeDomain.EVENT_TIME);
        Assert.assertThat(doFnWithTimers.onTimerInvocations, Matchers.contains(new TimerInternals.TimerData[]{TimerInternals.TimerData.of("testTimerId", StateNamespaces.window(globalWindows.windowCoder(), GlobalWindow.INSTANCE), instant.plus(millis), TimeDomain.EVENT_TIME)}));
    }

    @Test
    public void testBackwardsInTimeNoSkew() {
        SimpleDoFnRunner simpleDoFnRunner = new SimpleDoFnRunner(null, new SkewingDoFn(Duration.ZERO), NullSideInputReader.empty(), new ListOutputManager(), new TupleTag(), Collections.emptyList(), this.mockStepContext, null, Collections.emptyMap(), WindowingStrategy.of(new GlobalWindows()), DoFnSchemaInformation.create());
        simpleDoFnRunner.startBundle();
        simpleDoFnRunner.processElement(WindowedValue.timestampedValueInGlobalWindow(Duration.ZERO, new Instant(0L)));
        this.thrown.expect(UserCodeException.class);
        this.thrown.expectCause(Matchers.isA(IllegalArgumentException.class));
        this.thrown.expectMessage("must be no earlier");
        this.thrown.expectMessage(String.format("timestamp of the current input (%s)", new Instant(0L).toString()));
        this.thrown.expectMessage(String.format("the allowed skew (%s)", PeriodFormat.getDefault().print(Duration.ZERO.toPeriod())));
        simpleDoFnRunner.processElement(WindowedValue.timestampedValueInGlobalWindow(Duration.millis(1L), new Instant(0L)));
    }

    @Test
    public void testSkew() {
        SimpleDoFnRunner simpleDoFnRunner = new SimpleDoFnRunner(null, new SkewingDoFn(Duration.standardMinutes(10L)), NullSideInputReader.empty(), new ListOutputManager(), new TupleTag(), Collections.emptyList(), this.mockStepContext, null, Collections.emptyMap(), WindowingStrategy.of(new GlobalWindows()), DoFnSchemaInformation.create());
        simpleDoFnRunner.startBundle();
        simpleDoFnRunner.processElement(WindowedValue.timestampedValueInGlobalWindow(Duration.standardMinutes(5L), new Instant(0L)));
        this.thrown.expect(UserCodeException.class);
        this.thrown.expectCause(Matchers.isA(IllegalArgumentException.class));
        this.thrown.expectMessage("must be no earlier");
        this.thrown.expectMessage(String.format("timestamp of the current input (%s)", new Instant(0L).toString()));
        this.thrown.expectMessage(String.format("the allowed skew (%s)", PeriodFormat.getDefault().print(Duration.standardMinutes(10L).toPeriod())));
        simpleDoFnRunner.processElement(WindowedValue.timestampedValueInGlobalWindow(Duration.standardHours(1L), new Instant(0L)));
    }

    @Test
    public void testInfiniteSkew() {
        SimpleDoFnRunner simpleDoFnRunner = new SimpleDoFnRunner(null, new SkewingDoFn(Duration.millis(Long.MAX_VALUE)), NullSideInputReader.empty(), new ListOutputManager(), new TupleTag(), Collections.emptyList(), this.mockStepContext, null, Collections.emptyMap(), WindowingStrategy.of(new GlobalWindows()), DoFnSchemaInformation.create());
        simpleDoFnRunner.startBundle();
        simpleDoFnRunner.processElement(WindowedValue.timestampedValueInGlobalWindow(Duration.millis(1L), new Instant(0L)));
        simpleDoFnRunner.processElement(WindowedValue.timestampedValueInGlobalWindow(Duration.millis(1L), BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1L))));
        simpleDoFnRunner.processElement(WindowedValue.timestampedValueInGlobalWindow(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()).minus(Duration.millis(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis())), BoundedWindow.TIMESTAMP_MAX_VALUE));
    }
}
