/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join;

import java.util.ArrayList;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.KeyedCoProcessOperatorWithWatermarkDelay;
import org.apache.flink.table.runtime.operators.join.RowTimeBoundedStreamJoin;
import org.apache.flink.table.runtime.operators.join.TimeBoundedStreamJoinTestBase;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.runtime.util.BinaryRowKeySelector;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.junit.Assert;
import org.junit.Test;

public class RowTimeBoundedStreamJoinTest
extends TimeBoundedStreamJoinTestBase {
    private int keyIdx = 1;
    private BinaryRowKeySelector keySelector = new BinaryRowKeySelector(new int[]{this.keyIdx}, this.rowType.getLogicalTypes());
    private TypeInformation<BaseRow> keyType = new BaseRowTypeInfo(new LogicalType[0]);

    @Test
    public void testRowTimeInnerJoinWithCommonBounds() throws Exception {
        RowTimeBoundedStreamJoin joinProcessFunc = new RowTimeBoundedStreamJoin(FlinkJoinType.INNER, -10L, 20L, 0L, this.rowType, this.rowType, this.generatedFunction, 0, 0);
        KeyedTwoInputStreamOperatorTestHarness<BaseRow, BaseRow, BaseRow, BaseRow> testHarness = this.createTestHarness(joinProcessFunc);
        testHarness.open();
        testHarness.processWatermark1(new Watermark(1L));
        testHarness.processWatermark2(new Watermark(1L));
        testHarness.processElement1(StreamRecordUtils.record(1L, "k1"));
        Assert.assertEquals((long)1L, (long)testHarness.numEventTimeTimers());
        testHarness.processElement1(StreamRecordUtils.record(2L, "k1"));
        testHarness.processElement2(StreamRecordUtils.record(2L, "k1"));
        Assert.assertEquals((long)2L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement1(StreamRecordUtils.record(5L, "k1"));
        testHarness.processElement2(StreamRecordUtils.record(15L, "k1"));
        testHarness.processWatermark1(new Watermark(20L));
        testHarness.processWatermark2(new Watermark(20L));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement1(StreamRecordUtils.record(35L, "k1"));
        testHarness.processWatermark1(new Watermark(38L));
        testHarness.processWatermark2(new Watermark(38L));
        testHarness.processElement1(StreamRecordUtils.record(40L, "k2"));
        testHarness.processElement2(StreamRecordUtils.record(39L, "k2"));
        Assert.assertEquals((long)6L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(61L));
        testHarness.processWatermark2(new Watermark(61L));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(new Watermark(-19L));
        expectedOutput.add(StreamRecordUtils.record(1L, "k1", 2L, "k1"));
        expectedOutput.add(StreamRecordUtils.record(2L, "k1", 2L, "k1"));
        expectedOutput.add(StreamRecordUtils.record(5L, "k1", 2L, "k1"));
        expectedOutput.add(StreamRecordUtils.record(5L, "k1", 15L, "k1"));
        expectedOutput.add(new Watermark(0L));
        expectedOutput.add(StreamRecordUtils.record(35L, "k1", 15L, "k1"));
        expectedOutput.add(new Watermark(18L));
        expectedOutput.add(StreamRecordUtils.record(40L, "k2", 39L, "k2"));
        expectedOutput.add(new Watermark(41L));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testRowTimeInnerJoinWithNegativeBounds() throws Exception {
        RowTimeBoundedStreamJoin joinProcessFunc = new RowTimeBoundedStreamJoin(FlinkJoinType.INNER, -10L, -7L, 0L, this.rowType, this.rowType, this.generatedFunction, 0, 0);
        KeyedTwoInputStreamOperatorTestHarness<BaseRow, BaseRow, BaseRow, BaseRow> testHarness = this.createTestHarness(joinProcessFunc);
        testHarness.open();
        testHarness.processWatermark1(new Watermark(1L));
        testHarness.processWatermark2(new Watermark(1L));
        testHarness.processElement2(StreamRecordUtils.record(2L, "k1"));
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(2L));
        testHarness.processWatermark2(new Watermark(2L));
        testHarness.processElement1(StreamRecordUtils.record(3L, "k1"));
        testHarness.processElement2(StreamRecordUtils.record(3L, "k1"));
        testHarness.processElement2(StreamRecordUtils.record(13L, "k1"));
        testHarness.processElement1(StreamRecordUtils.record(6L, "k1"));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(10L));
        testHarness.processWatermark2(new Watermark(10L));
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(18L));
        testHarness.processWatermark2(new Watermark(18L));
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(new Watermark(-9L));
        expectedOutput.add(new Watermark(-8L));
        expectedOutput.add(StreamRecordUtils.record(3L, "k1", 13L, "k1"));
        expectedOutput.add(StreamRecordUtils.record(6L, "k1", 13L, "k1"));
        expectedOutput.add(new Watermark(0L));
        expectedOutput.add(new Watermark(8L));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testRowTimeLeftOuterJoin() throws Exception {
        RowTimeBoundedStreamJoin joinProcessFunc = new RowTimeBoundedStreamJoin(FlinkJoinType.LEFT, -5L, 9L, 0L, this.rowType, this.rowType, this.generatedFunction, 0, 0);
        KeyedTwoInputStreamOperatorTestHarness<BaseRow, BaseRow, BaseRow, BaseRow> testHarness = this.createTestHarness(joinProcessFunc);
        testHarness.open();
        testHarness.processElement1(StreamRecordUtils.record(1L, "k1"));
        testHarness.processElement2(StreamRecordUtils.record(1L, "k2"));
        Assert.assertEquals((long)2L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(14L));
        testHarness.processWatermark2(new Watermark(14L));
        Assert.assertEquals((long)1L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(18L));
        testHarness.processWatermark2(new Watermark(18L));
        Assert.assertEquals((long)0L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement1(StreamRecordUtils.record(2L, "k1"));
        testHarness.processElement2(StreamRecordUtils.record(2L, "k2"));
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)testHarness.numEventTimeTimers());
        testHarness.processElement1(StreamRecordUtils.record(19L, "k1"));
        testHarness.processElement1(StreamRecordUtils.record(20L, "k1"));
        testHarness.processElement2(StreamRecordUtils.record(26L, "k1"));
        testHarness.processElement2(StreamRecordUtils.record(25L, "k1"));
        testHarness.processElement1(StreamRecordUtils.record(21L, "k1"));
        testHarness.processElement2(StreamRecordUtils.record(39L, "k2"));
        testHarness.processElement2(StreamRecordUtils.record(40L, "k2"));
        testHarness.processElement1(StreamRecordUtils.record(50L, "k2"));
        testHarness.processElement1(StreamRecordUtils.record(49L, "k2"));
        testHarness.processElement2(StreamRecordUtils.record(41L, "k2"));
        testHarness.processWatermark1(new Watermark(100L));
        testHarness.processWatermark2(new Watermark(100L));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.record(1L, "k1", null, null));
        expectedOutput.add(new Watermark(5L));
        expectedOutput.add(new Watermark(9L));
        expectedOutput.add(StreamRecordUtils.record(2L, "k1", null, null));
        expectedOutput.add(StreamRecordUtils.record(20L, "k1", 25L, "k1"));
        expectedOutput.add(StreamRecordUtils.record(21L, "k1", 25L, "k1"));
        expectedOutput.add(StreamRecordUtils.record(21L, "k1", 26L, "k1"));
        expectedOutput.add(StreamRecordUtils.record(49L, "k2", 40L, "k2"));
        expectedOutput.add(StreamRecordUtils.record(49L, "k2", 41L, "k2"));
        expectedOutput.add(StreamRecordUtils.record(50L, "k2", 41L, "k2"));
        expectedOutput.add(StreamRecordUtils.record(19L, "k1", null, null));
        expectedOutput.add(new Watermark(91L));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testRowTimeRightOuterJoin() throws Exception {
        RowTimeBoundedStreamJoin joinProcessFunc = new RowTimeBoundedStreamJoin(FlinkJoinType.RIGHT, -5L, 9L, 0L, this.rowType, this.rowType, this.generatedFunction, 0, 0);
        KeyedTwoInputStreamOperatorTestHarness<BaseRow, BaseRow, BaseRow, BaseRow> testHarness = this.createTestHarness(joinProcessFunc);
        testHarness.open();
        testHarness.processElement1(StreamRecordUtils.record(1L, "k1"));
        testHarness.processElement2(StreamRecordUtils.record(1L, "k2"));
        Assert.assertEquals((long)2L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(14L));
        testHarness.processWatermark2(new Watermark(14L));
        Assert.assertEquals((long)1L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(18L));
        testHarness.processWatermark2(new Watermark(18L));
        Assert.assertEquals((long)0L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement1(StreamRecordUtils.record(2L, "k1"));
        testHarness.processElement2(StreamRecordUtils.record(2L, "k2"));
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)testHarness.numEventTimeTimers());
        testHarness.processElement1(StreamRecordUtils.record(19L, "k1"));
        testHarness.processElement1(StreamRecordUtils.record(20L, "k1"));
        testHarness.processElement2(StreamRecordUtils.record(26L, "k1"));
        testHarness.processElement2(StreamRecordUtils.record(25L, "k1"));
        testHarness.processElement1(StreamRecordUtils.record(21L, "k1"));
        testHarness.processElement2(StreamRecordUtils.record(39L, "k2"));
        testHarness.processElement2(StreamRecordUtils.record(40L, "k2"));
        testHarness.processElement1(StreamRecordUtils.record(50L, "k2"));
        testHarness.processElement1(StreamRecordUtils.record(49L, "k2"));
        testHarness.processElement2(StreamRecordUtils.record(41L, "k2"));
        testHarness.processWatermark1(new Watermark(100L));
        testHarness.processWatermark2(new Watermark(100L));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(new Watermark(5L));
        expectedOutput.add(StreamRecordUtils.record(null, null, 1L, "k2"));
        expectedOutput.add(new Watermark(9L));
        expectedOutput.add(StreamRecordUtils.record(null, null, 2L, "k2"));
        expectedOutput.add(StreamRecordUtils.record(20L, "k1", 25L, "k1"));
        expectedOutput.add(StreamRecordUtils.record(21L, "k1", 25L, "k1"));
        expectedOutput.add(StreamRecordUtils.record(21L, "k1", 26L, "k1"));
        expectedOutput.add(StreamRecordUtils.record(49L, "k2", 40L, "k2"));
        expectedOutput.add(StreamRecordUtils.record(49L, "k2", 41L, "k2"));
        expectedOutput.add(StreamRecordUtils.record(50L, "k2", 41L, "k2"));
        expectedOutput.add(StreamRecordUtils.record(null, null, 39L, "k2"));
        expectedOutput.add(new Watermark(91L));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testRowTimeFullOuterJoin() throws Exception {
        RowTimeBoundedStreamJoin joinProcessFunc = new RowTimeBoundedStreamJoin(FlinkJoinType.FULL, -5L, 9L, 0L, this.rowType, this.rowType, this.generatedFunction, 0, 0);
        KeyedTwoInputStreamOperatorTestHarness<BaseRow, BaseRow, BaseRow, BaseRow> testHarness = this.createTestHarness(joinProcessFunc);
        testHarness.open();
        testHarness.processElement1(StreamRecordUtils.record(1L, "k1"));
        testHarness.processElement2(StreamRecordUtils.record(1L, "k2"));
        Assert.assertEquals((long)2L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(14L));
        testHarness.processWatermark2(new Watermark(14L));
        Assert.assertEquals((long)1L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(18L));
        testHarness.processWatermark2(new Watermark(18L));
        Assert.assertEquals((long)0L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement1(StreamRecordUtils.record(2L, "k1"));
        testHarness.processElement2(StreamRecordUtils.record(2L, "k2"));
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)testHarness.numEventTimeTimers());
        testHarness.processElement1(StreamRecordUtils.record(19L, "k1"));
        testHarness.processElement1(StreamRecordUtils.record(20L, "k1"));
        testHarness.processElement2(StreamRecordUtils.record(26L, "k1"));
        testHarness.processElement2(StreamRecordUtils.record(25L, "k1"));
        testHarness.processElement1(StreamRecordUtils.record(21L, "k1"));
        testHarness.processElement2(StreamRecordUtils.record(39L, "k2"));
        testHarness.processElement2(StreamRecordUtils.record(40L, "k2"));
        testHarness.processElement1(StreamRecordUtils.record(50L, "k2"));
        testHarness.processElement1(StreamRecordUtils.record(49L, "k2"));
        testHarness.processElement2(StreamRecordUtils.record(41L, "k2"));
        testHarness.processWatermark1(new Watermark(100L));
        testHarness.processWatermark2(new Watermark(100L));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.record(1L, "k1", null, null));
        expectedOutput.add(new Watermark(5L));
        expectedOutput.add(StreamRecordUtils.record(null, null, 1L, "k2"));
        expectedOutput.add(new Watermark(9L));
        expectedOutput.add(StreamRecordUtils.record(2L, "k1", null, null));
        expectedOutput.add(StreamRecordUtils.record(null, null, 2L, "k2"));
        expectedOutput.add(StreamRecordUtils.record(20L, "k1", 25L, "k1"));
        expectedOutput.add(StreamRecordUtils.record(21L, "k1", 25L, "k1"));
        expectedOutput.add(StreamRecordUtils.record(21L, "k1", 26L, "k1"));
        expectedOutput.add(StreamRecordUtils.record(49L, "k2", 40L, "k2"));
        expectedOutput.add(StreamRecordUtils.record(49L, "k2", 41L, "k2"));
        expectedOutput.add(StreamRecordUtils.record(50L, "k2", 41L, "k2"));
        expectedOutput.add(StreamRecordUtils.record(19L, "k1", null, null));
        expectedOutput.add(StreamRecordUtils.record(null, null, 39L, "k2"));
        expectedOutput.add(new Watermark(91L));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    private KeyedTwoInputStreamOperatorTestHarness<BaseRow, BaseRow, BaseRow, BaseRow> createTestHarness(RowTimeBoundedStreamJoin windowJoinFunc) throws Exception {
        KeyedCoProcessOperatorWithWatermarkDelay operator = new KeyedCoProcessOperatorWithWatermarkDelay((CoProcessFunction)windowJoinFunc, windowJoinFunc.getMaxOutputDelay());
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator, (KeySelector)this.keySelector, (KeySelector)this.keySelector, this.keyType);
        return testHarness;
    }
}

