/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.wmassigners;

import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import junit.framework.TestCase;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator;
import org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTestBase;
import org.junit.Assert;
import org.junit.Test;

public class WatermarkAssignerOperatorTest
extends WatermarkAssignerOperatorTestBase {
    @Test
    public void testWatermarkAssignerWithIdleSource() throws Exception {
        WatermarkAssignerOperator operator = new WatermarkAssignerOperator(0, 1L, 1000L);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator);
        testHarness.getExecutionConfig().setAutoWatermarkInterval(50L);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{1L})));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{2L})));
        testHarness.processWatermark(new Watermark(2L));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{3L})));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{4L})));
        testHarness.setProcessingTime(51L);
        ConcurrentLinkedQueue output = testHarness.getOutput();
        List<Watermark> watermarks = this.extractWatermarks(output);
        Assert.assertEquals((long)1L, (long)watermarks.size());
        Assert.assertEquals((Object)new Watermark(3L), (Object)watermarks.get(0));
        Assert.assertEquals((Object)StreamStatus.ACTIVE, (Object)testHarness.getStreamStatus());
        output.clear();
        testHarness.setProcessingTime(1001L);
        Assert.assertEquals((Object)StreamStatus.IDLE, (Object)testHarness.getStreamStatus());
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{4L})));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{5L})));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{6L})));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{7L})));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{8L})));
        Assert.assertEquals((Object)StreamStatus.ACTIVE, (Object)testHarness.getStreamStatus());
        testHarness.setProcessingTime(1060L);
        output = testHarness.getOutput();
        watermarks = this.extractWatermarks(output);
        Assert.assertEquals((long)1L, (long)watermarks.size());
        Assert.assertEquals((Object)new Watermark(7L), (Object)watermarks.get(0));
    }

    @Test
    public void testWatermarkAssignerOperator() throws Exception {
        Tuple2<Long, Long> update;
        Object next;
        WatermarkAssignerOperator operator = new WatermarkAssignerOperator(0, 1L, -1L);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator);
        testHarness.getExecutionConfig().setAutoWatermarkInterval(50L);
        long currentTime = 0L;
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{1L})));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{2L})));
        testHarness.processWatermark(new Watermark(2L));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{3L})));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{4L})));
        ConcurrentLinkedQueue output = testHarness.getOutput();
        long nextElementValue = 1L;
        long lastWatermark = -1L;
        while (lastWatermark < 3L) {
            if (output.size() > 0) {
                next = output.poll();
                Assert.assertNotNull(next);
                update = this.validateElement(next, nextElementValue, lastWatermark);
                nextElementValue = (Long)update.f0;
                lastWatermark = (Long)update.f1;
                TestCase.assertTrue((lastWatermark < nextElementValue ? 1 : 0) != 0);
                continue;
            }
            testHarness.setProcessingTime(currentTime += 10L);
        }
        output.clear();
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{4L})));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{5L})));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{6L})));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{7L})));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{8L})));
        output = testHarness.getOutput();
        nextElementValue = 4L;
        lastWatermark = 2L;
        while (lastWatermark < 7L) {
            if (output.size() > 0) {
                next = output.poll();
                Assert.assertNotNull(next);
                update = this.validateElement(next, nextElementValue, lastWatermark);
                nextElementValue = (Long)update.f0;
                lastWatermark = (Long)update.f1;
                TestCase.assertTrue((lastWatermark < nextElementValue ? 1 : 0) != 0);
                continue;
            }
            testHarness.setProcessingTime(currentTime += 10L);
        }
        output.clear();
        testHarness.processWatermark(new Watermark(Long.MAX_VALUE));
        Assert.assertEquals((long)Long.MAX_VALUE, (long)((Watermark)testHarness.getOutput().poll()).getTimestamp());
    }
}

