/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.over;

import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.over.RowTimeOverWindowTestBase;
import org.apache.flink.table.runtime.operators.over.RowTimeRangeBoundedPrecedingFunction;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.junit.Assert;
import org.junit.Test;

public class RowTimeRangeBoundedPrecedingFunctionTest
extends RowTimeOverWindowTestBase {
    @Test
    public void testStateCleanup() throws Exception {
        RowTimeRangeBoundedPrecedingFunction function = new RowTimeRangeBoundedPrecedingFunction(aggsHandleFunction, this.accTypes, this.inputFieldTypes, 2000L, 2);
        KeyedProcessOperator operator = new KeyedProcessOperator((KeyedProcessFunction)function);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)operator);
        testHarness.open();
        AbstractKeyedStateBackend stateBackend = (AbstractKeyedStateBackend)operator.getKeyedStateBackend();
        Assert.assertEquals((String)"Initial state is not empty", (long)0L, (long)stateBackend.numKeyValueStateEntries());
        testHarness.processElement(StreamRecordUtils.insertRecord("key", 1L, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key", 1L, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key", 1L, 500L));
        testHarness.processWatermark(new Watermark(1000L));
        testHarness.processWatermark(new Watermark(4000L));
        Assert.assertEquals((String)"State has not been cleaned up", (long)0L, (long)stateBackend.numKeyValueStateEntries());
    }

    @Test
    public void testLateRecordMetrics() throws Exception {
        RowTimeRangeBoundedPrecedingFunction function = new RowTimeRangeBoundedPrecedingFunction(aggsHandleFunction, this.accTypes, this.inputFieldTypes, 2000L, 2);
        KeyedProcessOperator operator = new KeyedProcessOperator((KeyedProcessFunction)function);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)operator);
        testHarness.open();
        Counter counter = function.getCounter();
        testHarness.processElement(StreamRecordUtils.insertRecord("key", 1L, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key", 1L, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key", 1L, 500L));
        testHarness.processWatermark(new Watermark(500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key", 1L, 400L));
        Assert.assertEquals((long)1L, (long)counter.getCount());
    }
}

