/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.transforms;

import java.io.Serializable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithTimestamps;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class WithTimestampsTest
implements Serializable {
    @Rule
    public final transient TestPipeline p = TestPipeline.create();
    @Rule
    public transient ExpectedException thrown = ExpectedException.none();

    @Test
    @Category(value={ValidatesRunner.class})
    public void withTimestampsShouldApplyTimestamps() {
        SerializableFunction & Serializable timestampFn = (SerializableFunction & Serializable)input -> new Instant((Object)Long.valueOf(input));
        String yearTwoThousand = "946684800000";
        PCollection timestamped = (PCollection)((PCollection)this.p.apply((PTransform)Create.of((Object)"1234", (Object[])new String[]{"0", Integer.toString(Integer.MAX_VALUE), yearTwoThousand}))).apply((PTransform)WithTimestamps.of((SerializableFunction)timestampFn));
        PCollection timestampedVals = (PCollection)timestamped.apply((PTransform)ParDo.of((DoFn)new DoFn<String, KV<String, Instant>>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) throws Exception {
                c.output((Object)KV.of((Object)((String)c.element()), (Object)c.timestamp()));
            }
        }));
        PAssert.that((PCollection)timestamped).containsInAnyOrder((Object[])new String[]{yearTwoThousand, "0", "1234", Integer.toString(Integer.MAX_VALUE)});
        PAssert.that((PCollection)timestampedVals).containsInAnyOrder((Object[])new KV[]{KV.of((Object)"0", (Object)new Instant(0L)), KV.of((Object)"1234", (Object)new Instant(1234L)), KV.of((Object)Integer.toString(Integer.MAX_VALUE), (Object)new Instant(Integer.MAX_VALUE)), KV.of((Object)yearTwoThousand, (Object)new Instant((Object)Long.valueOf(yearTwoThousand)))});
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void withTimestampsBackwardsInTimeShouldThrow() {
        SerializableFunction & Serializable timestampFn = (SerializableFunction & Serializable)input -> new Instant((Object)Long.valueOf(input));
        SerializableFunction & Serializable backInTimeFn = (SerializableFunction & Serializable)input -> new Instant((Object)Long.valueOf(input)).minus((ReadableDuration)Duration.millis((long)1000L));
        String yearTwoThousand = "946684800000";
        ((PCollection)((PCollection)this.p.apply((PTransform)Create.of((Object)"1234", (Object[])new String[]{"0", Integer.toString(Integer.MAX_VALUE), yearTwoThousand}))).apply("WithTimestamps", (PTransform)WithTimestamps.of((SerializableFunction)timestampFn))).apply("AddSkew", (PTransform)WithTimestamps.of((SerializableFunction)backInTimeFn));
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectCause(Matchers.isA(IllegalArgumentException.class));
        this.thrown.expectMessage("no earlier than the timestamp of the current input");
        this.p.run();
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void withTimestampsBackwardsInTimeAndWithAllowedTimestampSkewShouldSucceed() {
        SerializableFunction & Serializable timestampFn = (SerializableFunction & Serializable)input -> new Instant((Object)Long.valueOf(input));
        Duration skew = Duration.millis((long)1000L);
        SerializableFunction & Serializable backInTimeFn = (SerializableFunction & Serializable)input -> new Instant((Object)Long.valueOf(input)).minus((ReadableDuration)skew);
        String yearTwoThousand = "946684800000";
        PCollection timestampedWithSkew = (PCollection)((PCollection)((PCollection)this.p.apply((PTransform)Create.of((Object)"1234", (Object[])new String[]{"0", Integer.toString(Integer.MAX_VALUE), yearTwoThousand}))).apply("FirstTimestamp", (PTransform)WithTimestamps.of((SerializableFunction)timestampFn))).apply("WithSkew", (PTransform)WithTimestamps.of((SerializableFunction)backInTimeFn).withAllowedTimestampSkew(skew.plus(100L)));
        PCollection timestampedVals = (PCollection)timestampedWithSkew.apply((PTransform)ParDo.of((DoFn)new DoFn<String, KV<String, Instant>>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) throws Exception {
                c.output((Object)KV.of((Object)((String)c.element()), (Object)c.timestamp()));
            }
        }));
        PAssert.that((PCollection)timestampedWithSkew).containsInAnyOrder((Object[])new String[]{yearTwoThousand, "0", "1234", Integer.toString(Integer.MAX_VALUE)});
        PAssert.that((PCollection)timestampedVals).containsInAnyOrder((Object[])new KV[]{KV.of((Object)"0", (Object)new Instant(0L).minus((ReadableDuration)skew)), KV.of((Object)"1234", (Object)new Instant(1234L).minus((ReadableDuration)skew)), KV.of((Object)Integer.toString(Integer.MAX_VALUE), (Object)new Instant((Object)Integer.MAX_VALUE).minus((ReadableDuration)skew)), KV.of((Object)yearTwoThousand, (Object)new Instant((Object)Long.valueOf(yearTwoThousand)).minus((ReadableDuration)skew))});
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void withTimestampsWithNullTimestampShouldThrow() {
        SerializableFunction & Serializable timestampFn = (SerializableFunction & Serializable)input -> null;
        String yearTwoThousand = "946684800000";
        ((PCollection)this.p.apply((PTransform)Create.of((Object)"1234", (Object[])new String[]{"0", Integer.toString(Integer.MAX_VALUE), yearTwoThousand}))).apply((PTransform)WithTimestamps.of((SerializableFunction)timestampFn));
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectCause(Matchers.isA(NullPointerException.class));
        this.thrown.expectMessage("WithTimestamps");
        this.thrown.expectMessage("cannot be null");
        this.p.run();
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void withTimestampsWithNullFnShouldThrowOnConstruction() {
        SerializableFunction timestampFn = null;
        this.thrown.expect(NullPointerException.class);
        this.thrown.expectMessage("WithTimestamps fn cannot be null");
        ((PCollection)this.p.apply((PTransform)Create.of((Object)"1234", (Object[])new String[]{"0", Integer.toString(Integer.MAX_VALUE)}))).apply((PTransform)WithTimestamps.of(timestampFn));
        this.p.run();
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void withTimestampsLambdaShouldApplyTimestamps() {
        String yearTwoThousand = "946684800000";
        PCollection timestamped = (PCollection)((PCollection)this.p.apply((PTransform)Create.of((Object)"1234", (Object[])new String[]{"0", Integer.toString(Integer.MAX_VALUE), "946684800000"}))).apply((PTransform)WithTimestamps.of((SerializableFunction & Serializable)input -> new Instant((Object)Long.valueOf(input))));
        PCollection timestampedVals = (PCollection)timestamped.apply((PTransform)ParDo.of((DoFn)new DoFn<String, KV<String, Instant>>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) throws Exception {
                c.output((Object)KV.of((Object)((String)c.element()), (Object)c.timestamp()));
            }
        }));
        PAssert.that((PCollection)timestamped).containsInAnyOrder((Object[])new String[]{"946684800000", "0", "1234", Integer.toString(Integer.MAX_VALUE)});
        PAssert.that((PCollection)timestampedVals).containsInAnyOrder((Object[])new KV[]{KV.of((Object)"0", (Object)new Instant(0L)), KV.of((Object)"1234", (Object)new Instant((Object)Long.valueOf("1234"))), KV.of((Object)Integer.toString(Integer.MAX_VALUE), (Object)new Instant(Integer.MAX_VALUE)), KV.of((Object)"946684800000", (Object)new Instant((Object)Long.valueOf("946684800000")))});
        this.p.run();
    }
}

