/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.deduplicate;

import java.util.ArrayList;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
import org.apache.flink.table.runtime.operators.bundle.trigger.BundleTrigger;
import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateFunctionTestBase;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeMiniBatchDeduplicateKeepLastRowFunction;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.junit.Assert;
import org.junit.Test;

public class ProcTimeMiniBatchDeduplicateKeepLastRowFunctionTest
extends ProcTimeDeduplicateFunctionTestBase {
    private TypeSerializer<RowData> typeSerializer;

    public ProcTimeMiniBatchDeduplicateKeepLastRowFunctionTest() {
        this.typeSerializer = this.inputRowType.createSerializer(new ExecutionConfig());
    }

    private ProcTimeMiniBatchDeduplicateKeepLastRowFunction createFunction(boolean generateUpdateBefore, boolean generateInsert, long minRetentionTime) {
        return new ProcTimeMiniBatchDeduplicateKeepLastRowFunction(this.inputRowType, this.typeSerializer, minRetentionTime, generateUpdateBefore, generateInsert, true);
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(ProcTimeMiniBatchDeduplicateKeepLastRowFunction func) throws Exception {
        CountBundleTrigger trigger = new CountBundleTrigger(3L);
        KeyedMapBundleOperator op = new KeyedMapBundleOperator((MapBundleFunction)func, (BundleTrigger)trigger);
        return new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)op, (KeySelector)this.rowKeySelector, this.rowKeySelector.getProducedType());
    }

    @Test
    public void testWithoutGenerateUpdateBefore() throws Exception {
        ProcTimeMiniBatchDeduplicateKeepLastRowFunction func = this.createFunction(false, true, this.minTime.toMilliseconds());
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 10));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        Assert.assertTrue((boolean)testHarness.getOutput().isEmpty());
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 13));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 11));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 13));
        this.assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 3L, 11));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 1L, 12));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 2L, 11));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 3L, 11));
        testHarness.close();
        this.assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testWithoutGenerateUpdateBeforeAndInsert() throws Exception {
        ProcTimeMiniBatchDeduplicateKeepLastRowFunction func = this.createFunction(false, false, this.minTime.toMilliseconds());
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 10));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        Assert.assertTrue((boolean)testHarness.getOutput().isEmpty());
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 13));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 2L, 11));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 1L, 13));
        this.assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 3L, 11));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 1L, 12));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 2L, 11));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 3L, 11));
        testHarness.close();
        this.assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testWithGenerateUpdateBefore() throws Exception {
        ProcTimeMiniBatchDeduplicateKeepLastRowFunction func = this.createFunction(true, true, this.minTime.toMilliseconds());
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 10));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        Assert.assertTrue((boolean)testHarness.getOutput().isEmpty());
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 13));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 11));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 13));
        this.assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 3L, 11));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("book", 1L, 13));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 1L, 12));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("book", 2L, 11));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 2L, 11));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 3L, 11));
        testHarness.close();
        this.assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testWithGenerateUpdateBeforeAndStateTtl() throws Exception {
        ProcTimeMiniBatchDeduplicateKeepLastRowFunction func = this.createFunction(true, true, this.minTime.toMilliseconds());
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 10));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        Assert.assertTrue((boolean)testHarness.getOutput().isEmpty());
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 13));
        testHarness.setStateTtlProcessingTime(30L);
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 17));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 18));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 19));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 11));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 13));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 19));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 18));
        this.assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
    }
}

