/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join;

import java.util.ArrayList;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.LegacyKeyedCoProcessOperator;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.ProcTimeBoundedStreamJoin;
import org.apache.flink.table.runtime.operators.join.TimeBoundedStreamJoinTestBase;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.runtime.util.BinaryRowKeySelector;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.junit.Assert;
import org.junit.Test;

public class ProcTimeBoundedStreamJoinTest
extends TimeBoundedStreamJoinTestBase {
    private int keyIdx = 0;
    private BinaryRowKeySelector keySelector = new BinaryRowKeySelector(new int[]{this.keyIdx}, this.rowType.getLogicalTypes());
    private TypeInformation<BaseRow> keyType = new BaseRowTypeInfo(new LogicalType[0]);

    @Test
    public void testProcTimeInnerJoinWithCommonBounds() throws Exception {
        ProcTimeBoundedStreamJoin joinProcessFunc = new ProcTimeBoundedStreamJoin(FlinkJoinType.INNER, -10L, 20L, this.rowType, this.rowType, this.generatedFunction);
        KeyedTwoInputStreamOperatorTestHarness<BaseRow, BaseRow, BaseRow, BaseRow> testHarness = this.createTestHarness(joinProcessFunc);
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement1(StreamRecordUtils.record(1L, "1a1"));
        Assert.assertEquals((long)1L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(2L);
        testHarness.processElement1(StreamRecordUtils.record(2L, "2a2"));
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(3L);
        testHarness.processElement1(StreamRecordUtils.record(1L, "1a3"));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.processElement2(StreamRecordUtils.record(1L, "1b3"));
        testHarness.setProcessingTime(4L);
        testHarness.processElement2(StreamRecordUtils.record(2L, "2b4"));
        Assert.assertEquals((long)8L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(13L);
        testHarness.processElement2(StreamRecordUtils.record(1L, "1b13"));
        testHarness.setProcessingTime(33L);
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.processElement1(StreamRecordUtils.record(1L, "1a33"));
        testHarness.processElement1(StreamRecordUtils.record(2L, "2a33"));
        testHarness.processElement2(StreamRecordUtils.record(2L, "2b33"));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.record(1L, "1a1", 1L, "1b3"));
        expectedOutput.add(StreamRecordUtils.record(1L, "1a3", 1L, "1b3"));
        expectedOutput.add(StreamRecordUtils.record(2L, "2a2", 2L, "2b4"));
        expectedOutput.add(StreamRecordUtils.record(1L, "1a3", 1L, "1b13"));
        expectedOutput.add(StreamRecordUtils.record(1L, "1a33", 1L, "1b13"));
        expectedOutput.add(StreamRecordUtils.record(2L, "2a33", 2L, "2b33"));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testProcTimeInnerJoinWithNegativeBounds() throws Exception {
        ProcTimeBoundedStreamJoin joinProcessFunc = new ProcTimeBoundedStreamJoin(FlinkJoinType.INNER, -10L, -5L, this.rowType, this.rowType, this.generatedFunction);
        KeyedTwoInputStreamOperatorTestHarness<BaseRow, BaseRow, BaseRow, BaseRow> testHarness = this.createTestHarness(joinProcessFunc);
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement1(StreamRecordUtils.record(1L, "1a1"));
        testHarness.setProcessingTime(2L);
        testHarness.processElement1(StreamRecordUtils.record(2L, "2a2"));
        testHarness.setProcessingTime(3L);
        testHarness.processElement1(StreamRecordUtils.record(1L, "1a3"));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.processElement2(StreamRecordUtils.record(1L, "1b3"));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(7L);
        testHarness.processElement2(StreamRecordUtils.record(2L, "2b7"));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(12L);
        testHarness.processElement2(StreamRecordUtils.record(1L, "1b12"));
        testHarness.setProcessingTime(13L);
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(14L);
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)1L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(16L);
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)testHarness.numProcessingTimeTimers());
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.record(2L, "2a2", 2L, "2b7"));
        expectedOutput.add(StreamRecordUtils.record(1L, "1a3", 1L, "1b12"));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    private KeyedTwoInputStreamOperatorTestHarness<BaseRow, BaseRow, BaseRow, BaseRow> createTestHarness(ProcTimeBoundedStreamJoin windowJoinFunc) throws Exception {
        LegacyKeyedCoProcessOperator operator = new LegacyKeyedCoProcessOperator((CoProcessFunction)windowJoinFunc);
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator, (KeySelector)this.keySelector, (KeySelector)this.keySelector, this.keyType);
        return testHarness;
    }
}

