/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Assert;
import org.junit.Test;

public class TimestampsAndPeriodicWatermarksOperatorTest {
    @Test
    public void testTimestampsAndPeriodicWatermarksOperator() throws Exception {
        Tuple2<Long, Long> update;
        Object next;
        TimestampsAndPeriodicWatermarksOperator operator = new TimestampsAndPeriodicWatermarksOperator((AssignerWithPeriodicWatermarks)new LongExtractor());
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator);
        testHarness.getExecutionConfig().setAutoWatermarkInterval(50L);
        long currentTime = 0L;
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)1L, 1L));
        testHarness.processElement(new StreamRecord((Object)2L, 1L));
        testHarness.processWatermark(new Watermark(2L));
        testHarness.processElement(new StreamRecord((Object)3L, 3L));
        testHarness.processElement(new StreamRecord((Object)4L, 3L));
        ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
        long nextElementValue = 1L;
        long lastWatermark = -1L;
        while (lastWatermark < 3L) {
            if (output.size() > 0) {
                next = output.poll();
                Assert.assertNotNull((Object)next);
                update = this.validateElement(next, nextElementValue, lastWatermark);
                nextElementValue = (Long)update.f0;
                lastWatermark = (Long)update.f1;
                Assert.assertTrue((lastWatermark < nextElementValue ? 1 : 0) != 0);
                continue;
            }
            testHarness.setProcessingTime(currentTime += 10L);
        }
        output.clear();
        testHarness.processElement(new StreamRecord((Object)4L, 4L));
        testHarness.processElement(new StreamRecord((Object)5L, 4L));
        testHarness.processElement(new StreamRecord((Object)6L, 4L));
        testHarness.processElement(new StreamRecord((Object)7L, 4L));
        testHarness.processElement(new StreamRecord((Object)8L, 4L));
        output = testHarness.getOutput();
        nextElementValue = 4L;
        lastWatermark = 2L;
        while (lastWatermark < 7L) {
            if (output.size() > 0) {
                next = output.poll();
                Assert.assertNotNull((Object)next);
                update = this.validateElement(next, nextElementValue, lastWatermark);
                nextElementValue = (Long)update.f0;
                lastWatermark = (Long)update.f1;
                Assert.assertTrue((lastWatermark < nextElementValue ? 1 : 0) != 0);
                continue;
            }
            testHarness.setProcessingTime(currentTime += 10L);
        }
        output.clear();
        testHarness.processWatermark(new Watermark(Long.MAX_VALUE));
        Assert.assertEquals((long)Long.MAX_VALUE, (long)((Watermark)testHarness.getOutput().poll()).getTimestamp());
    }

    @Test
    public void testNegativeTimestamps() throws Exception {
        long[] values;
        NeverWatermarkExtractor assigner = new NeverWatermarkExtractor();
        TimestampsAndPeriodicWatermarksOperator operator = new TimestampsAndPeriodicWatermarksOperator((AssignerWithPeriodicWatermarks)assigner);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator);
        testHarness.getExecutionConfig().setAutoWatermarkInterval(50L);
        testHarness.open();
        for (long value : values = new long[]{Long.MIN_VALUE, -1L, 0L, 1L, 2L, 3L, Long.MAX_VALUE}) {
            testHarness.processElement(new StreamRecord((Object)value));
        }
        ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
        for (long value : values) {
            Assert.assertEquals((long)value, (long)((StreamRecord)output.poll()).getTimestamp());
        }
    }

    private Tuple2<Long, Long> validateElement(Object element, long nextElementValue, long currentWatermark) {
        if (element instanceof StreamRecord) {
            StreamRecord record = (StreamRecord)element;
            Assert.assertEquals((long)nextElementValue, (long)((Long)record.getValue()));
            Assert.assertEquals((long)nextElementValue, (long)record.getTimestamp());
            return new Tuple2((Object)(nextElementValue + 1L), (Object)currentWatermark);
        }
        if (element instanceof Watermark) {
            long wt = ((Watermark)element).getTimestamp();
            Assert.assertTrue((wt > currentWatermark ? 1 : 0) != 0);
            return new Tuple2((Object)nextElementValue, (Object)wt);
        }
        throw new IllegalArgumentException("unrecognized element: " + element);
    }

    private static class NeverWatermarkExtractor
    implements AssignerWithPeriodicWatermarks<Long> {
        private static final long serialVersionUID = 1L;

        private NeverWatermarkExtractor() {
        }

        public long extractTimestamp(Long element, long previousElementTimestamp) {
            return element;
        }

        public Watermark getCurrentWatermark() {
            return null;
        }
    }

    private static class LongExtractor
    implements AssignerWithPeriodicWatermarks<Long> {
        private static final long serialVersionUID = 1L;
        private long currentTimestamp = Long.MIN_VALUE;

        private LongExtractor() {
        }

        public long extractTimestamp(Long element, long previousElementTimestamp) {
            this.currentTimestamp = element;
            return element;
        }

        public Watermark getCurrentWatermark() {
            return new Watermark(this.currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : this.currentTimestamp - 1L);
        }
    }
}

