/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.wmassigners;

import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import junit.framework.TestCase;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.runtime.operators.wmassigners.MiniBatchedWatermarkAssignerOperator;
import org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTestBase;
import org.junit.Assert;
import org.junit.Test;

public class MiniBatchedWatermarkAssignerOperatorTest
extends WatermarkAssignerOperatorTestBase {
    @Test
    public void testMiniBatchedWatermarkAssignerWithIdleSource() throws Exception {
        MiniBatchedWatermarkAssignerOperator operator = new MiniBatchedWatermarkAssignerOperator(0, 1L, 0L, 1000L, 50L);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{1L})));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{2L})));
        testHarness.processWatermark(new Watermark(2L));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{3L})));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{4L})));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{50L})));
        ConcurrentLinkedQueue output = testHarness.getOutput();
        List<Watermark> watermarks = this.extractWatermarks(output);
        Assert.assertEquals((long)1L, (long)watermarks.size());
        Assert.assertEquals((Object)new Watermark(49L), (Object)watermarks.get(0));
        Assert.assertEquals((Object)StreamStatus.ACTIVE, (Object)testHarness.getStreamStatus());
        output.clear();
        testHarness.setProcessingTime(1001L);
        Assert.assertEquals((Object)StreamStatus.IDLE, (Object)testHarness.getStreamStatus());
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{51L})));
        Assert.assertEquals((Object)StreamStatus.ACTIVE, (Object)testHarness.getStreamStatus());
        testHarness.setProcessingTime(1060L);
        output = testHarness.getOutput();
        watermarks = this.extractWatermarks(output);
        TestCase.assertTrue((boolean)watermarks.isEmpty());
        output.clear();
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{100L})));
        output = testHarness.getOutput();
        watermarks = this.extractWatermarks(output);
        Assert.assertEquals((long)1L, (long)watermarks.size());
        Assert.assertEquals((Object)new Watermark(99L), (Object)watermarks.get(0));
    }

    @Test
    public void testMiniBatchedWatermarkAssignerOperator() throws Exception {
        MiniBatchedWatermarkAssignerOperator operator = new MiniBatchedWatermarkAssignerOperator(0, 1L, 0L, -1L, 50L);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{1L})));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{2L})));
        testHarness.processWatermark(new Watermark(2L));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{3L})));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{4L})));
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{50L})));
        ConcurrentLinkedQueue output = testHarness.getOutput();
        List<Watermark> watermarks = this.extractWatermarks(output);
        Assert.assertEquals((long)1L, (long)watermarks.size());
        Assert.assertEquals((Object)new Watermark(49L), (Object)watermarks.get(0));
        output.clear();
        testHarness.setProcessingTime(1001L);
        output = testHarness.getOutput();
        watermarks = this.extractWatermarks(output);
        TestCase.assertTrue((boolean)watermarks.isEmpty());
        output.clear();
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{99L})));
        output = testHarness.getOutput();
        watermarks = this.extractWatermarks(output);
        TestCase.assertTrue((boolean)watermarks.isEmpty());
        output.clear();
        testHarness.processElement(new StreamRecord((Object)GenericRow.of((Object[])new Object[]{100L})));
        output = testHarness.getOutput();
        watermarks = this.extractWatermarks(output);
        Assert.assertEquals((long)1L, (long)watermarks.size());
        Assert.assertEquals((Object)new Watermark(99L), (Object)watermarks.get(0));
        output.clear();
    }
}

