package org.apache.flink.table.runtime.operators.join.interval;

import java.util.ArrayList;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.KeyedCoProcessOperatorWithWatermarkDelay;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/interval/RowTimeIntervalJoinTest.class */
public class RowTimeIntervalJoinTest extends TimeIntervalStreamJoinTestBase {
    private int keyIdx = 1;
    private BinaryRowDataKeySelector keySelector = new BinaryRowDataKeySelector(new int[]{this.keyIdx}, this.rowType.getLogicalTypes());
    private TypeInformation<RowData> keyType = new RowDataTypeInfo(new LogicalType[0]);

    @Test
    public void testRowTimeInnerJoinWithCommonBounds() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness = createTestHarness(new RowTimeIntervalJoin(FlinkJoinType.INNER, -10L, 20L, 0L, this.rowType, this.rowType, this.generatedFunction, 0, 0));
        createTestHarness.open();
        createTestHarness.processWatermark1(new Watermark(1L));
        createTestHarness.processWatermark2(new Watermark(1L));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(1L, "k1"));
        Assert.assertEquals(1L, createTestHarness.numEventTimeTimers());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(2L, "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(2L, "k1"));
        Assert.assertEquals(2L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(4L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(5L, "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(15L, "k1"));
        createTestHarness.processWatermark1(new Watermark(20L));
        createTestHarness.processWatermark2(new Watermark(20L));
        Assert.assertEquals(4L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(35L, "k1"));
        createTestHarness.processWatermark1(new Watermark(38L));
        createTestHarness.processWatermark2(new Watermark(38L));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(40L, "k2"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(39L, "k2"));
        Assert.assertEquals(6L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(61L));
        createTestHarness.processWatermark2(new Watermark(61L));
        Assert.assertEquals(4L, createTestHarness.numKeyedStateEntries());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Watermark(-19L));
        arrayList.add(StreamRecordUtils.insertRecord(1L, "k1", 2L, "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(2L, "k1", 2L, "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(5L, "k1", 2L, "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(5L, "k1", 15L, "k1"));
        arrayList.add(new Watermark(0L));
        arrayList.add(StreamRecordUtils.insertRecord(35L, "k1", 15L, "k1"));
        arrayList.add(new Watermark(18L));
        arrayList.add(StreamRecordUtils.insertRecord(40L, "k2", 39L, "k2"));
        arrayList.add(new Watermark(41L));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.close();
    }

    @Test
    public void testRowTimeInnerJoinWithNegativeBounds() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness = createTestHarness(new RowTimeIntervalJoin(FlinkJoinType.INNER, -10L, -7L, 0L, this.rowType, this.rowType, this.generatedFunction, 0, 0));
        createTestHarness.open();
        createTestHarness.processWatermark1(new Watermark(1L));
        createTestHarness.processWatermark2(new Watermark(1L));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(2L, "k1"));
        Assert.assertEquals(0L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(2L));
        createTestHarness.processWatermark2(new Watermark(2L));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(3L, "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(3L, "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(13L, "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(6L, "k1"));
        Assert.assertEquals(4L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(10L));
        createTestHarness.processWatermark2(new Watermark(10L));
        Assert.assertEquals(2L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(18L));
        createTestHarness.processWatermark2(new Watermark(18L));
        Assert.assertEquals(0L, createTestHarness.numKeyedStateEntries());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Watermark(-9L));
        arrayList.add(new Watermark(-8L));
        arrayList.add(StreamRecordUtils.insertRecord(3L, "k1", 13L, "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(6L, "k1", 13L, "k1"));
        arrayList.add(new Watermark(0L));
        arrayList.add(new Watermark(8L));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.close();
    }

    @Test
    public void testRowTimeLeftOuterJoin() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness = createTestHarness(new RowTimeIntervalJoin(FlinkJoinType.LEFT, -5L, 9L, 0L, this.rowType, this.rowType, this.generatedFunction, 0, 0));
        createTestHarness.open();
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(1L, "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(1L, "k2"));
        Assert.assertEquals(2L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(4L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(14L));
        createTestHarness.processWatermark2(new Watermark(14L));
        Assert.assertEquals(1L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(2L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(18L));
        createTestHarness.processWatermark2(new Watermark(18L));
        Assert.assertEquals(0L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(0L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(2L, "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(2L, "k2"));
        Assert.assertEquals(0L, createTestHarness.numKeyedStateEntries());
        Assert.assertEquals(0L, createTestHarness.numEventTimeTimers());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(19L, "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(20L, "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(26L, "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(25L, "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(21L, "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(39L, "k2"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(40L, "k2"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(50L, "k2"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(49L, "k2"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(41L, "k2"));
        createTestHarness.processWatermark1(new Watermark(100L));
        createTestHarness.processWatermark2(new Watermark(100L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(1L, "k1", null, null));
        arrayList.add(new Watermark(5L));
        arrayList.add(new Watermark(9L));
        arrayList.add(StreamRecordUtils.insertRecord(2L, "k1", null, null));
        arrayList.add(StreamRecordUtils.insertRecord(20L, "k1", 25L, "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(21L, "k1", 25L, "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(21L, "k1", 26L, "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(49L, "k2", 40L, "k2"));
        arrayList.add(StreamRecordUtils.insertRecord(49L, "k2", 41L, "k2"));
        arrayList.add(StreamRecordUtils.insertRecord(50L, "k2", 41L, "k2"));
        arrayList.add(StreamRecordUtils.insertRecord(19L, "k1", null, null));
        arrayList.add(new Watermark(91L));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.close();
    }

    @Test
    public void testRowTimeRightOuterJoin() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness = createTestHarness(new RowTimeIntervalJoin(FlinkJoinType.RIGHT, -5L, 9L, 0L, this.rowType, this.rowType, this.generatedFunction, 0, 0));
        createTestHarness.open();
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(1L, "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(1L, "k2"));
        Assert.assertEquals(2L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(4L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(14L));
        createTestHarness.processWatermark2(new Watermark(14L));
        Assert.assertEquals(1L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(2L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(18L));
        createTestHarness.processWatermark2(new Watermark(18L));
        Assert.assertEquals(0L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(0L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(2L, "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(2L, "k2"));
        Assert.assertEquals(0L, createTestHarness.numKeyedStateEntries());
        Assert.assertEquals(0L, createTestHarness.numEventTimeTimers());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(19L, "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(20L, "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(26L, "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(25L, "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(21L, "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(39L, "k2"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(40L, "k2"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(50L, "k2"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(49L, "k2"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(41L, "k2"));
        createTestHarness.processWatermark1(new Watermark(100L));
        createTestHarness.processWatermark2(new Watermark(100L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Watermark(5L));
        arrayList.add(StreamRecordUtils.insertRecord(null, null, 1L, "k2"));
        arrayList.add(new Watermark(9L));
        arrayList.add(StreamRecordUtils.insertRecord(null, null, 2L, "k2"));
        arrayList.add(StreamRecordUtils.insertRecord(20L, "k1", 25L, "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(21L, "k1", 25L, "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(21L, "k1", 26L, "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(49L, "k2", 40L, "k2"));
        arrayList.add(StreamRecordUtils.insertRecord(49L, "k2", 41L, "k2"));
        arrayList.add(StreamRecordUtils.insertRecord(50L, "k2", 41L, "k2"));
        arrayList.add(StreamRecordUtils.insertRecord(null, null, 39L, "k2"));
        arrayList.add(new Watermark(91L));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.close();
    }

    @Test
    public void testRowTimeFullOuterJoin() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness = createTestHarness(new RowTimeIntervalJoin(FlinkJoinType.FULL, -5L, 9L, 0L, this.rowType, this.rowType, this.generatedFunction, 0, 0));
        createTestHarness.open();
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(1L, "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(1L, "k2"));
        Assert.assertEquals(2L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(4L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(14L));
        createTestHarness.processWatermark2(new Watermark(14L));
        Assert.assertEquals(1L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(2L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(18L));
        createTestHarness.processWatermark2(new Watermark(18L));
        Assert.assertEquals(0L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(0L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(2L, "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(2L, "k2"));
        Assert.assertEquals(0L, createTestHarness.numKeyedStateEntries());
        Assert.assertEquals(0L, createTestHarness.numEventTimeTimers());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(19L, "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(20L, "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(26L, "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(25L, "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(21L, "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(39L, "k2"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(40L, "k2"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(50L, "k2"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(49L, "k2"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(41L, "k2"));
        createTestHarness.processWatermark1(new Watermark(100L));
        createTestHarness.processWatermark2(new Watermark(100L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(1L, "k1", null, null));
        arrayList.add(new Watermark(5L));
        arrayList.add(StreamRecordUtils.insertRecord(null, null, 1L, "k2"));
        arrayList.add(new Watermark(9L));
        arrayList.add(StreamRecordUtils.insertRecord(2L, "k1", null, null));
        arrayList.add(StreamRecordUtils.insertRecord(null, null, 2L, "k2"));
        arrayList.add(StreamRecordUtils.insertRecord(20L, "k1", 25L, "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(21L, "k1", 25L, "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(21L, "k1", 26L, "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(49L, "k2", 40L, "k2"));
        arrayList.add(StreamRecordUtils.insertRecord(49L, "k2", 41L, "k2"));
        arrayList.add(StreamRecordUtils.insertRecord(50L, "k2", 41L, "k2"));
        arrayList.add(StreamRecordUtils.insertRecord(19L, "k1", null, null));
        arrayList.add(StreamRecordUtils.insertRecord(null, null, 39L, "k2"));
        arrayList.add(new Watermark(91L));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.close();
    }

    private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness(RowTimeIntervalJoin rowTimeIntervalJoin) throws Exception {
        return new KeyedTwoInputStreamOperatorTestHarness<>(new KeyedCoProcessOperatorWithWatermarkDelay(rowTimeIntervalJoin, rowTimeIntervalJoin.getMaxOutputDelay()), this.keySelector, this.keySelector, this.keyType);
    }
}
