/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Assert;
import org.junit.Test;

public class TimestampsAndPunctuatedWatermarksOperatorTest {
    @Test
    public void testTimestampsAndPeriodicWatermarksOperator() throws Exception {
        TimestampsAndPunctuatedWatermarksOperator operator = new TimestampsAndPunctuatedWatermarksOperator((AssignerWithPunctuatedWatermarks)new PunctuatedExtractor());
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)3L, (Object)true), 0L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)5L, (Object)false), 0L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)4L, (Object)false), 0L));
        testHarness.processWatermark(new Watermark(10L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)4L, (Object)false), 0L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)4L, (Object)true), 0L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)9L, (Object)false), 0L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)5L, (Object)false), 0L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)7L, (Object)true), 0L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)10L, (Object)false), 0L));
        testHarness.processWatermark(new Watermark(Long.MAX_VALUE));
        ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
        Assert.assertEquals((long)3L, (long)((StreamRecord)output.poll()).getTimestamp());
        Assert.assertEquals((long)3L, (long)((Watermark)output.poll()).getTimestamp());
        Assert.assertEquals((long)5L, (long)((StreamRecord)output.poll()).getTimestamp());
        Assert.assertEquals((long)4L, (long)((StreamRecord)output.poll()).getTimestamp());
        Assert.assertEquals((long)4L, (long)((StreamRecord)output.poll()).getTimestamp());
        Assert.assertEquals((long)4L, (long)((StreamRecord)output.poll()).getTimestamp());
        Assert.assertEquals((long)4L, (long)((Watermark)output.poll()).getTimestamp());
        Assert.assertEquals((long)9L, (long)((StreamRecord)output.poll()).getTimestamp());
        Assert.assertEquals((long)5L, (long)((StreamRecord)output.poll()).getTimestamp());
        Assert.assertEquals((long)7L, (long)((StreamRecord)output.poll()).getTimestamp());
        Assert.assertEquals((long)7L, (long)((Watermark)output.poll()).getTimestamp());
        Assert.assertEquals((long)10L, (long)((StreamRecord)output.poll()).getTimestamp());
        Assert.assertEquals((long)Long.MAX_VALUE, (long)((Watermark)output.poll()).getTimestamp());
    }

    @Test
    public void testZeroOnNegativeTimestamps() throws Exception {
        long[] values;
        NeverWatermarkExtractor assigner = new NeverWatermarkExtractor();
        TimestampsAndPunctuatedWatermarksOperator operator = new TimestampsAndPunctuatedWatermarksOperator((AssignerWithPunctuatedWatermarks)assigner);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator);
        testHarness.open();
        for (long value : values = new long[]{Long.MIN_VALUE, -1L, 0L, 1L, 2L, 3L, Long.MAX_VALUE}) {
            testHarness.processElement(new StreamRecord((Object)value));
        }
        ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
        for (long value : values) {
            Assert.assertEquals((long)value, (long)((StreamRecord)output.poll()).getTimestamp());
        }
    }

    private static class NeverWatermarkExtractor
    implements AssignerWithPunctuatedWatermarks<Long> {
        private static final long serialVersionUID = 1L;

        private NeverWatermarkExtractor() {
        }

        public long extractTimestamp(Long element, long previousElementTimestamp) {
            return element;
        }

        public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
            return null;
        }
    }

    private static class PunctuatedExtractor
    implements AssignerWithPunctuatedWatermarks<Tuple2<Long, Boolean>> {
        private static final long serialVersionUID = 1L;

        private PunctuatedExtractor() {
        }

        public long extractTimestamp(Tuple2<Long, Boolean> element, long previousTimestamp) {
            return (Long)element.f0;
        }

        public Watermark checkAndGetNextWatermark(Tuple2<Long, Boolean> lastElement, long extractedTimestamp) {
            return (Boolean)lastElement.f1 != false ? new Watermark(extractedTimestamp) : null;
        }
    }
}

