/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.wmassigners;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.runtime.operators.wmassigners.ProcTimeMiniBatchAssignerOperator;
import org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTestBase;
import org.junit.Assert;
import org.junit.Test;

public class ProcTimeMiniBatchAssignerOperatorTest
extends WatermarkAssignerOperatorTestBase {
    @Test
    public void testMiniBatchAssignerOperator() throws Exception {
        long nextElementValue;
        Tuple2<Long, Long> update;
        Object next;
        ProcTimeMiniBatchAssignerOperator operator = new ProcTimeMiniBatchAssignerOperator(100L);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator);
        long currentTime = 0L;
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{1L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{2L})));
        testHarness.processWatermark(new Watermark(2L));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{3L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{4L})));
        ConcurrentLinkedQueue output = testHarness.getOutput();
        long currentElement = 1L;
        long lastWatermark = 0L;
        while (true) {
            if (output.size() > 0) {
                next = output.poll();
                Assert.assertNotNull(next);
                update = this.validateElement(next, currentElement, lastWatermark);
                nextElementValue = (Long)update.f0;
                lastWatermark = (Long)update.f1;
                if (next instanceof Watermark) break;
                Assert.assertEquals((long)currentElement, (long)(nextElementValue - 1L));
                ++currentElement;
                Assert.assertEquals((long)0L, (long)lastWatermark);
                continue;
            }
            testHarness.setProcessingTime(currentTime += 10L);
        }
        Assert.assertEquals((long)100L, (long)lastWatermark);
        output.clear();
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{4L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{5L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{6L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{7L})));
        testHarness.processElement(new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{8L})));
        output = testHarness.getOutput();
        currentElement = 4L;
        lastWatermark = 100L;
        while (true) {
            if (output.size() > 0) {
                next = output.poll();
                Assert.assertNotNull(next);
                update = this.validateElement(next, currentElement, lastWatermark);
                nextElementValue = (Long)update.f0;
                lastWatermark = (Long)update.f1;
                if (next instanceof Watermark) break;
                Assert.assertEquals((long)currentElement, (long)(nextElementValue - 1L));
                ++currentElement;
                Assert.assertEquals((long)100L, (long)lastWatermark);
                continue;
            }
            testHarness.setProcessingTime(currentTime += 10L);
        }
        Assert.assertEquals((long)200L, (long)lastWatermark);
        output.clear();
        testHarness.processWatermark(new Watermark(Long.MAX_VALUE));
        Assert.assertEquals((long)Long.MAX_VALUE, (long)((Watermark)testHarness.getOutput().poll()).getTimestamp());
    }
}

