package org.apache.flink.table.runtime.operators.over;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.class */
public class ProcTimeRangeBoundedPrecedingFunctionTest {
    private static GeneratedAggsHandleFunction aggsHandleFunction = new GeneratedAggsHandleFunction("Function", "", new Object[0]) { // from class: org.apache.flink.table.runtime.operators.over.ProcTimeRangeBoundedPrecedingFunctionTest.1
        /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
        public AggsHandleFunction m61newInstance(ClassLoader classLoader) {
            return new SumAggsHandleFunction(1);
        }
    };
    private LogicalType[] inputFieldTypes = {new VarCharType(Integer.MAX_VALUE), new BigIntType()};
    private LogicalType[] accTypes = {new BigIntType()};
    private RowDataKeySelector keySelector = HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, this.inputFieldTypes);
    private TypeInformation<RowData> keyType = this.keySelector.getProducedType();

    @Test
    public void testStateCleanup() throws Exception {
        KeyedProcessOperator<RowData, RowData, RowData> keyedProcessOperator = new KeyedProcessOperator<>(new ProcTimeRangeBoundedPrecedingFunction(aggsHandleFunction, this.accTypes, this.inputFieldTypes, 2000L));
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(keyedProcessOperator);
        createTestHarness.open();
        AbstractKeyedStateBackend keyedStateBackend = keyedProcessOperator.getKeyedStateBackend();
        Assert.assertEquals("Initial state is not empty", 0L, keyedStateBackend.numKeyValueStateEntries());
        createTestHarness.setProcessingTime(100L);
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key", 1L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key", 1L));
        createTestHarness.setProcessingTime(500L);
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key", 1L));
        createTestHarness.setProcessingTime(1000L);
        createTestHarness.setProcessingTime(4000L);
        Assert.assertEquals("State has not been cleaned up", 0L, keyedStateBackend.numKeyValueStateEntries());
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(KeyedProcessOperator<RowData, RowData, RowData> keyedProcessOperator) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness(keyedProcessOperator, this.keySelector, this.keyType);
    }
}
