package org.apache.flink.table.runtime.operators.deduplicate;

import java.util.ArrayList;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateKeepLastRowFunctionTest.class */
public class MiniBatchDeduplicateKeepLastRowFunctionTest extends DeduplicateFunctionTestBase {
    private TypeSerializer<RowData> typeSerializer = this.inputRowType.createSerializer(new ExecutionConfig());

    private MiniBatchDeduplicateKeepLastRowFunction createFunction(boolean z, boolean z2, long j) {
        return new MiniBatchDeduplicateKeepLastRowFunction(this.inputRowType, z, z2, this.typeSerializer, j);
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(MiniBatchDeduplicateKeepLastRowFunction miniBatchDeduplicateKeepLastRowFunction) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness(new KeyedMapBundleOperator(miniBatchDeduplicateKeepLastRowFunction, new CountBundleTrigger(3L)), this.rowKeySelector, this.rowKeySelector.m91getProducedType());
    }

    @Test
    public void testWithoutGenerateUpdateBefore() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(createFunction(false, true, this.minTime.toMilliseconds()));
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 10));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        Assert.assertTrue(createTestHarness.getOutput().isEmpty());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 13));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord("book", 2L, 11));
        arrayList.add(StreamRecordUtils.insertRecord("book", 1L, 13));
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 3L, 11));
        arrayList.add(StreamRecordUtils.updateAfterRecord("book", 1L, 12));
        arrayList.add(StreamRecordUtils.updateAfterRecord("book", 2L, 11));
        arrayList.add(StreamRecordUtils.insertRecord("book", 3L, 11));
        createTestHarness.close();
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
    }

    @Test
    public void testWithoutGenerateUpdateBeforeAndInsert() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(createFunction(false, false, this.minTime.toMilliseconds()));
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 10));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        Assert.assertTrue(createTestHarness.getOutput().isEmpty());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 13));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.updateAfterRecord("book", 2L, 11));
        arrayList.add(StreamRecordUtils.updateAfterRecord("book", 1L, 13));
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 3L, 11));
        arrayList.add(StreamRecordUtils.updateAfterRecord("book", 1L, 12));
        arrayList.add(StreamRecordUtils.updateAfterRecord("book", 2L, 11));
        arrayList.add(StreamRecordUtils.updateAfterRecord("book", 3L, 11));
        createTestHarness.close();
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
    }

    @Test
    public void testWithGenerateUpdateBefore() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(createFunction(true, true, this.minTime.toMilliseconds()));
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 10));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        Assert.assertTrue(createTestHarness.getOutput().isEmpty());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 13));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord("book", 2L, 11));
        arrayList.add(StreamRecordUtils.insertRecord("book", 1L, 13));
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 3L, 11));
        arrayList.add(StreamRecordUtils.updateBeforeRecord("book", 1L, 13));
        arrayList.add(StreamRecordUtils.updateAfterRecord("book", 1L, 12));
        arrayList.add(StreamRecordUtils.updateBeforeRecord("book", 2L, 11));
        arrayList.add(StreamRecordUtils.updateAfterRecord("book", 2L, 11));
        arrayList.add(StreamRecordUtils.insertRecord("book", 3L, 11));
        createTestHarness.close();
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
    }

    @Test
    public void testWithGenerateUpdateBeforeAndStateTtl() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(createFunction(true, true, this.minTime.toMilliseconds()));
        createTestHarness.setup();
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 10));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        Assert.assertTrue(createTestHarness.getOutput().isEmpty());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 13));
        createTestHarness.setStateTtlProcessingTime(30L);
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 17));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 18));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 19));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord("book", 2L, 11));
        arrayList.add(StreamRecordUtils.insertRecord("book", 1L, 13));
        arrayList.add(StreamRecordUtils.insertRecord("book", 1L, 19));
        arrayList.add(StreamRecordUtils.insertRecord("book", 2L, 18));
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
    }
}
