/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.deduplicate;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
import org.apache.flink.table.runtime.operators.bundle.trigger.BundleTrigger;
import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeDeduplicateFunction;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeMiniBatchDeduplicateFunction;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class RowTimeDeduplicateFunctionTest {
    private final long miniBatchSize = 4L;
    private Time minTtlTime = Time.milliseconds((long)10L);
    private InternalTypeInfo inputRowType = InternalTypeInfo.ofFields((LogicalType[])new LogicalType[]{new VarCharType(Integer.MAX_VALUE), new IntType(), new BigIntType()});
    private TypeSerializer<RowData> serializer = this.inputRowType.toSerializer();
    private int rowTimeIndex = 2;
    private int rowKeyIndex = 0;
    private BinaryRowDataKeySelector rowKeySelector = new BinaryRowDataKeySelector(new int[]{this.rowKeyIndex}, this.inputRowType.toRowFieldTypes());
    private RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(this.inputRowType.toRowFieldTypes(), new GenericRowRecordSortComparator(this.rowKeyIndex, this.inputRowType.toRowFieldTypes()[this.rowKeyIndex]));
    private final boolean miniBatchEnable;

    public RowTimeDeduplicateFunctionTest(boolean miniBacthEnable) {
        this.miniBatchEnable = miniBacthEnable;
    }

    @Test
    public void testRowTimeDeduplicateKeepFirstRow() throws Exception {
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 13, 99L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 101L));
        expectedOutput.add(new Watermark(102L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key3", 5, 299L));
        expectedOutput.add(new Watermark(302L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 12, 400L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 401L));
        expectedOutput.add(new Watermark(402L));
        this.testRowTimeDeduplicateKeepFirstRow(true, true, expectedOutput);
        this.testRowTimeDeduplicateKeepFirstRow(true, false, expectedOutput);
        this.testRowTimeDeduplicateKeepFirstRow(false, true, expectedOutput);
        expectedOutput.clear();
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 13, 99L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 101L));
        expectedOutput.add(new Watermark(102L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key3", 5, 299L));
        expectedOutput.add(new Watermark(302L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 400L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 401L));
        expectedOutput.add(new Watermark(402L));
        this.testRowTimeDeduplicateKeepFirstRow(false, false, expectedOutput);
    }

    @Test
    public void testRowTimeDeduplicateKeepLastRow() throws Exception {
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 13, 99L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_BEFORE, "key1", 13, 99L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 100L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 101L));
        expectedOutput.add(new Watermark(102L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_BEFORE, "key1", 12, 100L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 300L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_BEFORE, "key2", 11, 101L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 301L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key3", 5, 299L));
        expectedOutput.add(new Watermark(302L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 12, 400L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 401L));
        expectedOutput.add(new Watermark(402L));
        this.testRowTimeDeduplicateKeepLastRow(true, true, expectedOutput);
        this.testRowTimeDeduplicateKeepLastRow(true, false, expectedOutput);
        expectedOutput.clear();
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 13, 99L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 100L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 101L));
        expectedOutput.add(new Watermark(102L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 300L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 301L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key3", 5, 299L));
        expectedOutput.add(new Watermark(302L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 12, 400L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 401L));
        expectedOutput.add(new Watermark(402L));
        this.testRowTimeDeduplicateKeepLastRow(false, true, expectedOutput);
        expectedOutput.clear();
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 13, 99L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 100L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 101L));
        expectedOutput.add(new Watermark(102L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 300L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 301L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key3", 5, 299L));
        expectedOutput.add(new Watermark(302L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 400L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 401L));
        expectedOutput.add(new Watermark(402L));
        this.testRowTimeDeduplicateKeepLastRow(false, false, expectedOutput);
    }

    private void testRowTimeDeduplicateKeepFirstRow(boolean generateUpdateBefore, boolean generateInsert, List<Object> expectedOutput) throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness;
        RowTimeDeduplicateFunction func;
        boolean keepLastRow = false;
        KeyedMapBundleOperator keyedMapBundleOperator = null;
        KeyedProcessOperator keyedProcessOperator = null;
        if (this.miniBatchEnable) {
            func = new RowTimeMiniBatchDeduplicateFunction(this.inputRowType, this.serializer, this.minTtlTime.toMilliseconds(), this.rowTimeIndex, generateUpdateBefore, generateInsert, false);
            CountBundleTrigger trigger = new CountBundleTrigger(4L);
            keyedMapBundleOperator = new KeyedMapBundleOperator((MapBundleFunction)func, (BundleTrigger)trigger);
            testHarness = this.createTestHarness((KeyedMapBundleOperator<RowData, RowData, RowData, RowData>)keyedMapBundleOperator);
        } else {
            func = new RowTimeDeduplicateFunction(this.inputRowType, this.minTtlTime.toMilliseconds(), this.rowTimeIndex, generateUpdateBefore, generateInsert, false);
            keyedProcessOperator = new KeyedProcessOperator((KeyedProcessFunction)func);
            testHarness = this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)keyedProcessOperator);
        }
        ArrayList<Object> actualOutput = new ArrayList<Object>();
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 13, 99L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 13, 99L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 12, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 11, 101L));
        testHarness.processWatermark(new Watermark(102L));
        actualOutput.addAll(testHarness.getOutput());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        testHarness = this.miniBatchEnable ? this.createTestHarness((KeyedMapBundleOperator<RowData, RowData, RowData, RowData>)keyedMapBundleOperator) : this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)keyedProcessOperator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 12, 300L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 11, 301L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key3", 5, 299L));
        testHarness.processWatermark(new Watermark(302L));
        testHarness.setStateTtlProcessingTime(this.minTtlTime.toMilliseconds() + 1L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 12, 400L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 11, 401L));
        testHarness.processWatermark(402L);
        actualOutput.addAll(testHarness.getOutput());
        this.assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, actualOutput);
        testHarness.close();
    }

    private void testRowTimeDeduplicateKeepLastRow(boolean generateUpdateBefore, boolean generateInsert, List<Object> expectedOutput) throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness;
        RowTimeDeduplicateFunction func;
        boolean keepLastRow = true;
        KeyedMapBundleOperator keyedMapBundleOperator = null;
        KeyedProcessOperator keyedProcessOperator = null;
        if (this.miniBatchEnable) {
            func = new RowTimeMiniBatchDeduplicateFunction(this.inputRowType, this.serializer, this.minTtlTime.toMilliseconds(), this.rowTimeIndex, generateUpdateBefore, generateInsert, true);
            CountBundleTrigger trigger = new CountBundleTrigger(4L);
            keyedMapBundleOperator = new KeyedMapBundleOperator((MapBundleFunction)func, (BundleTrigger)trigger);
            testHarness = this.createTestHarness((KeyedMapBundleOperator<RowData, RowData, RowData, RowData>)keyedMapBundleOperator);
        } else {
            func = new RowTimeDeduplicateFunction(this.inputRowType, this.minTtlTime.toMilliseconds(), this.rowTimeIndex, generateUpdateBefore, generateInsert, true);
            keyedProcessOperator = new KeyedProcessOperator((KeyedProcessFunction)func);
            testHarness = this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)keyedProcessOperator);
        }
        ArrayList<Object> actualOutput = new ArrayList<Object>();
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 13, 99L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 12, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 11, 101L));
        testHarness.processWatermark(new Watermark(102L));
        actualOutput.addAll(testHarness.getOutput());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        testHarness = this.miniBatchEnable ? this.createTestHarness((KeyedMapBundleOperator<RowData, RowData, RowData, RowData>)keyedMapBundleOperator) : this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)keyedProcessOperator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 12, 300L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 11, 301L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key3", 5, 299L));
        testHarness.processWatermark(new Watermark(302L));
        testHarness.setStateTtlProcessingTime(this.minTtlTime.toMilliseconds() + 1L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 12, 400L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 11, 401L));
        testHarness.processWatermark(402L);
        actualOutput.addAll(testHarness.getOutput());
        this.assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, actualOutput);
        testHarness.close();
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(KeyedProcessOperator<RowData, RowData, RowData> operator) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness(operator, (KeySelector)this.rowKeySelector, this.rowKeySelector.getProducedType());
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(KeyedMapBundleOperator<RowData, RowData, RowData, RowData> operator) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness(operator, (KeySelector)this.rowKeySelector, this.rowKeySelector.getProducedType());
    }

    @Parameterized.Parameters(name="miniBatchEnable = {0}")
    public static Collection<Boolean[]> runMode() {
        return Arrays.asList({false}, {true});
    }
}

