package org.apache.flink.table.runtime.operators.join.window;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.class */
public class WindowJoinOperatorTest {
    private static final InternalTypeInfo<RowData> INPUT_ROW_TYPE = InternalTypeInfo.ofFields(new LogicalType[]{new BigIntType(), VarCharType.STRING_TYPE});
    private static final InternalTypeInfo<RowData> OUTPUT_ROW_TYPE = InternalTypeInfo.ofFields(new LogicalType[]{new BigIntType(), VarCharType.STRING_TYPE, new BigIntType(), VarCharType.STRING_TYPE});
    private static final RowDataHarnessAssertor ASSERTER = new RowDataHarnessAssertor(OUTPUT_ROW_TYPE.toRowFieldTypes());
    private static final RowDataHarnessAssertor SEMI_ANTI_JOIN_ASSERTER = new RowDataHarnessAssertor(INPUT_ROW_TYPE.toRowFieldTypes());
    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
    private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
    private final ZoneId shiftTimeZone;

    public WindowJoinOperatorTest(ZoneId zoneId) {
        this.shiftTimeZone = zoneId;
    }

    @Parameterized.Parameters(name = "TimeZone = {0}")
    public static Collection<Object[]> runMode() {
        return Arrays.asList(new Object[]{UTC_ZONE_ID}, new Object[]{SHANGHAI_ZONE_ID});
    }

    @Test
    public void testSemiJoin() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness = createTestHarness(FlinkJoinType.SEMI);
        createTestHarness.open();
        createTestHarness.processWatermark1(new Watermark(1L));
        createTestHarness.processWatermark2(new Watermark(1L));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(1L, this.shiftTimeZone)), "k1"));
        Assert.assertEquals(0L, createTestHarness.numEventTimeTimers());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(6L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(9L, this.shiftTimeZone)), "k1"));
        Assert.assertEquals(3L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(4L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(10L));
        createTestHarness.processWatermark2(new Watermark(10L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Watermark(1L));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        arrayList.add(new Watermark(10L));
        SEMI_ANTI_JOIN_ASSERTER.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
        Assert.assertEquals(0L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(0L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(12L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        Assert.assertEquals(3L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(13L));
        createTestHarness.processWatermark2(new Watermark(13L));
        arrayList.add(new Watermark(13L));
        Assert.assertEquals(2L, createTestHarness.numKeyedStateEntries());
        ASSERTER.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.processWatermark1(new Watermark(18L));
        createTestHarness.processWatermark2(new Watermark(18L));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        arrayList.add(new Watermark(18L));
        ASSERTER.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.close();
    }

    @Test
    public void testAntiJoin() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness = createTestHarness(FlinkJoinType.ANTI);
        createTestHarness.open();
        createTestHarness.processWatermark1(new Watermark(1L));
        createTestHarness.processWatermark2(new Watermark(1L));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(1L, this.shiftTimeZone)), "k1"));
        Assert.assertEquals(0L, createTestHarness.numEventTimeTimers());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(6L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(9L, this.shiftTimeZone)), "k1"));
        Assert.assertEquals(3L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(4L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(10L));
        createTestHarness.processWatermark2(new Watermark(10L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Watermark(1L));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(6L, this.shiftTimeZone)), "k1"));
        arrayList.add(new Watermark(10L));
        SEMI_ANTI_JOIN_ASSERTER.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
        Assert.assertEquals(0L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(0L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(12L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        Assert.assertEquals(3L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(13L));
        createTestHarness.processWatermark2(new Watermark(13L));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(12L, this.shiftTimeZone)), "k1"));
        arrayList.add(new Watermark(13L));
        Assert.assertEquals(2L, createTestHarness.numKeyedStateEntries());
        ASSERTER.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.processWatermark1(new Watermark(18L));
        createTestHarness.processWatermark2(new Watermark(18L));
        arrayList.add(new Watermark(18L));
        ASSERTER.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.close();
    }

    @Test
    public void testInnerJoin() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness = createTestHarness(FlinkJoinType.INNER);
        createTestHarness.open();
        createTestHarness.processWatermark1(new Watermark(1L));
        createTestHarness.processWatermark2(new Watermark(1L));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(1L, this.shiftTimeZone)), "k1"));
        Assert.assertEquals(0L, createTestHarness.numEventTimeTimers());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(6L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(9L, this.shiftTimeZone)), "k1"));
        Assert.assertEquals(3L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(4L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(10L));
        createTestHarness.processWatermark2(new Watermark(10L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Watermark(1L));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1", Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1", Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        arrayList.add(new Watermark(10L));
        ASSERTER.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
        Assert.assertEquals(0L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(0L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(12L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        Assert.assertEquals(3L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(13L));
        createTestHarness.processWatermark2(new Watermark(13L));
        arrayList.add(new Watermark(13L));
        Assert.assertEquals(2L, createTestHarness.numKeyedStateEntries());
        ASSERTER.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.processWatermark1(new Watermark(18L));
        createTestHarness.processWatermark2(new Watermark(18L));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1", Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1", Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        arrayList.add(new Watermark(18L));
        ASSERTER.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.close();
    }

    @Test
    public void testLeftOuterJoin() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness = createTestHarness(FlinkJoinType.LEFT);
        createTestHarness.open();
        createTestHarness.processWatermark1(new Watermark(1L));
        createTestHarness.processWatermark2(new Watermark(1L));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(1L, this.shiftTimeZone)), "k1"));
        Assert.assertEquals(0L, createTestHarness.numEventTimeTimers());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(6L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(9L, this.shiftTimeZone)), "k1"));
        Assert.assertEquals(3L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(4L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(10L));
        createTestHarness.processWatermark2(new Watermark(10L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Watermark(1L));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1", Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1", Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(6L, this.shiftTimeZone)), "k1", null, null));
        arrayList.add(new Watermark(10L));
        ASSERTER.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
        Assert.assertEquals(0L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(0L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(12L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        Assert.assertEquals(3L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(13L));
        createTestHarness.processWatermark2(new Watermark(13L));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(12L, this.shiftTimeZone)), "k1", null, null));
        arrayList.add(new Watermark(13L));
        Assert.assertEquals(2L, createTestHarness.numKeyedStateEntries());
        ASSERTER.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.processWatermark1(new Watermark(18L));
        createTestHarness.processWatermark2(new Watermark(18L));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1", Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1", Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        arrayList.add(new Watermark(18L));
        ASSERTER.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.close();
    }

    @Test
    public void testRightOuterJoin() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness = createTestHarness(FlinkJoinType.RIGHT);
        createTestHarness.open();
        createTestHarness.processWatermark1(new Watermark(1L));
        createTestHarness.processWatermark2(new Watermark(1L));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(1L, this.shiftTimeZone)), "k1"));
        Assert.assertEquals(0L, createTestHarness.numEventTimeTimers());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(6L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(9L, this.shiftTimeZone)), "k1"));
        Assert.assertEquals(3L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(4L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(10L));
        createTestHarness.processWatermark2(new Watermark(10L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Watermark(1L));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1", Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1", Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(null, null, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(9L, this.shiftTimeZone)), "k1"));
        arrayList.add(new Watermark(10L));
        ASSERTER.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
        Assert.assertEquals(0L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(0L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(12L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        Assert.assertEquals(3L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(13L));
        createTestHarness.processWatermark2(new Watermark(13L));
        arrayList.add(new Watermark(13L));
        Assert.assertEquals(2L, createTestHarness.numKeyedStateEntries());
        ASSERTER.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.processWatermark1(new Watermark(18L));
        createTestHarness.processWatermark2(new Watermark(18L));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1", Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1", Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        arrayList.add(new Watermark(18L));
        ASSERTER.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.close();
    }

    @Test
    public void testOuterJoin() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness = createTestHarness(FlinkJoinType.FULL);
        createTestHarness.open();
        createTestHarness.processWatermark1(new Watermark(1L));
        createTestHarness.processWatermark2(new Watermark(1L));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(1L, this.shiftTimeZone)), "k1"));
        Assert.assertEquals(0L, createTestHarness.numEventTimeTimers());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(6L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(9L, this.shiftTimeZone)), "k1"));
        Assert.assertEquals(3L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(4L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(10L));
        createTestHarness.processWatermark2(new Watermark(10L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Watermark(1L));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1", Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1", Long.valueOf(TimeWindowUtil.toUtcTimestampMills(3L, this.shiftTimeZone)), "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(6L, this.shiftTimeZone)), "k1", null, null));
        arrayList.add(StreamRecordUtils.insertRecord(null, null, Long.valueOf(TimeWindowUtil.toUtcTimestampMills(9L, this.shiftTimeZone)), "k1"));
        arrayList.add(new Watermark(10L));
        ASSERTER.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
        Assert.assertEquals(0L, createTestHarness.numEventTimeTimers());
        Assert.assertEquals(0L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(12L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        Assert.assertEquals(3L, createTestHarness.numKeyedStateEntries());
        createTestHarness.processWatermark1(new Watermark(13L));
        createTestHarness.processWatermark2(new Watermark(13L));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(12L, this.shiftTimeZone)), "k1", null, null));
        arrayList.add(new Watermark(13L));
        Assert.assertEquals(2L, createTestHarness.numKeyedStateEntries());
        ASSERTER.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.processWatermark1(new Watermark(18L));
        createTestHarness.processWatermark2(new Watermark(18L));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1", Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        arrayList.add(StreamRecordUtils.insertRecord(Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1", Long.valueOf(TimeWindowUtil.toUtcTimestampMills(15L, this.shiftTimeZone)), "k1"));
        arrayList.add(new Watermark(18L));
        ASSERTER.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.close();
    }

    private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness(FlinkJoinType flinkJoinType) throws Exception {
        GeneratedJoinCondition generatedJoinCondition = new GeneratedJoinCondition("TestWindowJoinCondition", "public class TestWindowJoinCondition extends org.apache.flink.api.common.functions.AbstractRichFunction implements org.apache.flink.table.runtime.generated.JoinCondition {\n\n    public TestWindowJoinCondition(Object[] reference) {\n    }\n\n    @Override\n    public boolean apply(org.apache.flink.table.data.RowData in1, org.apache.flink.table.data.RowData in2) {\n        return true;\n    }\n}\n", new Object[0]);
        RowDataKeySelector rowDataSelector = HandwrittenSelectorUtil.getRowDataSelector(new int[]{1}, INPUT_ROW_TYPE.toRowFieldTypes());
        return new KeyedTwoInputStreamOperatorTestHarness<>(WindowJoinOperatorBuilder.builder().leftSerializer(INPUT_ROW_TYPE.toRowSerializer()).rightSerializer(INPUT_ROW_TYPE.toRowSerializer()).generatedJoinCondition(generatedJoinCondition).leftWindowEndIndex(0).rightWindowEndIndex(0).filterNullKeys(new boolean[]{true}).joinType(flinkJoinType).withShiftTimezone(this.shiftTimeZone).build(), rowDataSelector, rowDataSelector, InternalTypeInfo.ofFields(new LogicalType[0]));
    }
}
