/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.temporal;

import java.util.ArrayList;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator;
import org.apache.flink.table.runtime.operators.join.temporal.TemporalTimeJoinOperatorTestBase;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.junit.Test;

public class TemporalProcessTimeJoinOperatorTest
extends TemporalTimeJoinOperatorTestBase {
    private int keyIdx = 0;
    private InternalTypeInfo<RowData> rowType = InternalTypeInfo.ofFields((LogicalType[])new LogicalType[]{new BigIntType(), new VarCharType(Integer.MAX_VALUE)});
    private RowDataKeySelector keySelector = HandwrittenSelectorUtil.getRowDataSelector(new int[]{this.keyIdx}, this.rowType.toRowFieldTypes());
    private TypeInformation<RowData> keyType = this.keySelector.getProducedType();
    private InternalTypeInfo<RowData> outputRowType = InternalTypeInfo.ofFields((LogicalType[])new LogicalType[]{new BigIntType(), new VarCharType(Integer.MAX_VALUE), new BigIntType(), new VarCharType(Integer.MAX_VALUE)});
    private RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(this.outputRowType.toRowFieldTypes());

    @Test
    public void testProcTimeTemporalJoin() throws Exception {
        TemporalProcessTimeJoinOperator joinOperator = new TemporalProcessTimeJoinOperator(this.rowType, this.joinCondition, 0L, 0L, false);
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness = this.createTestHarness(joinOperator);
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement1(StreamRecordUtils.insertRecord(1L, "1a1"));
        testHarness.setProcessingTime(2L);
        testHarness.processElement2(StreamRecordUtils.insertRecord(2L, "2a2"));
        testHarness.setProcessingTime(3L);
        testHarness.processElement1(StreamRecordUtils.insertRecord(2L, "2a3"));
        testHarness.setProcessingTime(4L);
        testHarness.processElement2(StreamRecordUtils.insertRecord(1L, "1a4"));
        testHarness.setProcessingTime(5L);
        testHarness.processElement1(StreamRecordUtils.insertRecord(1L, "1a5"));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(2L, "2a3", 2L, "2a2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(1L, "1a5", 1L, "1a4"));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testProcTimeTemporalJoinWithStateRetention() throws Exception {
        int minRetentionTime = 10;
        int maxRetentionTime = 15;
        TemporalProcessTimeJoinOperator joinOperator = new TemporalProcessTimeJoinOperator(this.rowType, this.joinCondition, 10L, 15L, false);
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness = this.createTestHarness(joinOperator);
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement1(StreamRecordUtils.insertRecord(1L, "1a1"));
        testHarness.setProcessingTime(2L);
        testHarness.processElement2(StreamRecordUtils.insertRecord(2L, "2a2"));
        testHarness.setProcessingTime(3L);
        testHarness.processElement1(StreamRecordUtils.insertRecord(2L, "2a3"));
        testHarness.setProcessingTime(18L);
        testHarness.processElement1(StreamRecordUtils.insertRecord(2L, "1a5"));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(2L, "2a3", 2L, "2a2"));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testLeftProcTimeTemporalJoinWithStateRetention() throws Exception {
        int minRetentionTime = 10;
        int maxRetentionTime = 15;
        TemporalProcessTimeJoinOperator joinOperator = new TemporalProcessTimeJoinOperator(this.rowType, this.joinCondition, 10L, 15L, true);
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness = this.createTestHarness(joinOperator);
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement1(StreamRecordUtils.insertRecord(1L, "1a1"));
        testHarness.setProcessingTime(2L);
        testHarness.processElement2(StreamRecordUtils.insertRecord(2L, "2a2"));
        testHarness.setProcessingTime(3L);
        testHarness.processElement1(StreamRecordUtils.insertRecord(2L, "2a3"));
        testHarness.setProcessingTime(18L);
        testHarness.processElement1(StreamRecordUtils.insertRecord(2L, "1a5"));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1L, "1a1", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(2L, "2a3", 2L, "2a2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(2L, "1a5", null, null));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testProcTimeTemporalJoinOnChangelog() throws Exception {
        TemporalProcessTimeJoinOperator joinOperator = new TemporalProcessTimeJoinOperator(this.rowType, this.joinCondition, 0L, 0L, false);
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness = this.createTestHarness(joinOperator);
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement1(StreamRecordUtils.insertRecord(1L, "1a1"));
        testHarness.setProcessingTime(2L);
        testHarness.processElement2(StreamRecordUtils.insertRecord(2L, "2a2"));
        testHarness.setProcessingTime(3L);
        testHarness.processElement1(StreamRecordUtils.insertRecord(2L, "2a3"));
        testHarness.setProcessingTime(4L);
        testHarness.processElement2(StreamRecordUtils.insertRecord(1L, "1a4"));
        testHarness.processElement2(StreamRecordUtils.updateBeforeRecord(1L, "1a4"));
        testHarness.processElement2(StreamRecordUtils.updateAfterRecord(1L, "1a7"));
        testHarness.setProcessingTime(5L);
        testHarness.processElement1(StreamRecordUtils.insertRecord(1L, "1a5"));
        testHarness.processElement2(StreamRecordUtils.deleteRecord(1L, "1a7"));
        testHarness.setProcessingTime(6L);
        testHarness.processElement1(StreamRecordUtils.insertRecord(1L, "1a6"));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(2L, "2a3", 2L, "2a2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(1L, "1a5", 1L, "1a7"));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness(TemporalProcessTimeJoinOperator temporalJoinOperator) throws Exception {
        return new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)temporalJoinOperator, (KeySelector)this.keySelector, (KeySelector)this.keySelector, this.keyType);
    }
}

