/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.temporal;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator;
import org.apache.flink.table.runtime.operators.join.temporal.TemporalTimeJoinOperatorTestBase;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.junit.Test;

public class TemporalRowTimeJoinOperatorTest
extends TemporalTimeJoinOperatorTestBase {
    @Test
    public void testRowTimeTemporalJoin() throws Exception {
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(new Watermark(1L));
        expectedOutput.add(new Watermark(2L));
        expectedOutput.add(StreamRecordUtils.insertRecord(3L, "k1", "1a3", 2L, "k1", "1a2"));
        expectedOutput.add(new Watermark(5L));
        expectedOutput.add(StreamRecordUtils.insertRecord(6L, "k2", "2a3", 4L, "k2", "2a4"));
        expectedOutput.add(new Watermark(8L));
        expectedOutput.add(new Watermark(9L));
        expectedOutput.add(StreamRecordUtils.insertRecord(11L, "k2", "5a12", 10L, "k2", "2a6"));
        expectedOutput.add(new Watermark(13L));
        this.testRowTimeTemporalJoin(false, expectedOutput);
    }

    @Test
    public void testRowTimeLeftTemporalJoin() throws Exception {
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(new Watermark(1L));
        expectedOutput.add(StreamRecordUtils.insertRecord(1L, "k1", "1a1", null, null, null));
        expectedOutput.add(new Watermark(2L));
        expectedOutput.add(StreamRecordUtils.insertRecord(1L, "k1", "1a1", null, null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(3L, "k1", "1a3", 2L, "k1", "1a2"));
        expectedOutput.add(new Watermark(5L));
        expectedOutput.add(StreamRecordUtils.insertRecord(6L, "k2", "2a3", 4L, "k2", "2a4"));
        expectedOutput.add(new Watermark(8L));
        expectedOutput.add(StreamRecordUtils.insertRecord(9L, "k2", "5a11", null, null, null));
        expectedOutput.add(new Watermark(9L));
        expectedOutput.add(StreamRecordUtils.insertRecord(11L, "k2", "5a12", 10L, "k2", "2a6"));
        expectedOutput.add(new Watermark(13L));
        this.testRowTimeTemporalJoin(true, expectedOutput);
    }

    private void testRowTimeTemporalJoin(boolean isLeftOuterJoin, List<Object> expectedOutput) throws Exception {
        TemporalRowTimeJoinOperator joinOperator = new TemporalRowTimeJoinOperator(this.rowType, this.rowType, this.joinCondition, 0, 0, 0L, 0L, isLeftOuterJoin);
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness = this.createTestHarness(joinOperator);
        testHarness.open();
        testHarness.processWatermark1(new Watermark(1L));
        testHarness.processWatermark2(new Watermark(1L));
        testHarness.processElement1(StreamRecordUtils.insertRecord(1L, "k1", "1a1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(2L, "k1", "1a2"));
        testHarness.processWatermark1(new Watermark(2L));
        testHarness.processWatermark2(new Watermark(2L));
        testHarness.processElement1(StreamRecordUtils.insertRecord(1L, "k1", "1a1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(3L, "k1", "1a3"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(4L, "k2", "2a4"));
        testHarness.processWatermark1(new Watermark(5L));
        testHarness.processWatermark2(new Watermark(5L));
        testHarness.processElement1(StreamRecordUtils.insertRecord(6L, "k2", "2a3"));
        testHarness.processElement2(StreamRecordUtils.updateBeforeRecord(7L, "k2", "2a4"));
        testHarness.processElement2(StreamRecordUtils.updateAfterRecord(7L, "k2", "2a5"));
        testHarness.processWatermark1(new Watermark(8L));
        testHarness.processWatermark2(new Watermark(9L));
        testHarness.processElement1(StreamRecordUtils.insertRecord(9L, "k2", "5a11"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(11L, "k2", "5a12"));
        testHarness.processElement2(StreamRecordUtils.deleteRecord(9L, "k2", "2a5"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(10L, "k2", "2a6"));
        testHarness.processWatermark1(new Watermark(13L));
        testHarness.processWatermark2(new Watermark(13L));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testRowTimeTemporalJoinWithStateRetention() throws Exception {
        int minRetentionTime = 4;
        int maxRetentionTime = 6;
        TemporalRowTimeJoinOperator joinOperator = new TemporalRowTimeJoinOperator(this.rowType, this.rowType, this.joinCondition, 0, 0, 4L, 6L, true);
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness = this.createTestHarness(joinOperator);
        testHarness.open();
        testHarness.setProcessingTime(3L);
        testHarness.processElement2(StreamRecordUtils.insertRecord(3L, "k1", "0a3"));
        testHarness.setProcessingTime(6L);
        testHarness.processElement1(StreamRecordUtils.insertRecord(6L, "k1", "0a6"));
        testHarness.processWatermark1(new Watermark(7L));
        testHarness.processWatermark2(new Watermark(7L));
        testHarness.processElement2(StreamRecordUtils.updateBeforeRecord(3L, "k1", "0a3"));
        testHarness.processElement2(StreamRecordUtils.updateAfterRecord(3L, "k1", "0a5"));
        testHarness.setProcessingTime(9L);
        testHarness.processElement1(StreamRecordUtils.insertRecord(9L, "k1", "7a9"));
        testHarness.processWatermark1(new Watermark(13L));
        testHarness.processWatermark2(new Watermark(13L));
        testHarness.setProcessingTime(15L);
        testHarness.processElement1(StreamRecordUtils.insertRecord(15L, "k1", "13a15"));
        testHarness.processWatermark1(new Watermark(15L));
        testHarness.processWatermark2(new Watermark(16L));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(6L, "k1", "0a6", 3L, "k1", "0a3"));
        expectedOutput.add(new Watermark(7L));
        expectedOutput.add(StreamRecordUtils.insertRecord(9L, "k1", "7a9", 3L, "k1", "0a5"));
        expectedOutput.add(new Watermark(13L));
        expectedOutput.add(StreamRecordUtils.insertRecord(15L, "k1", "13a15", null, null, null));
        expectedOutput.add(new Watermark(15L));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness(TemporalRowTimeJoinOperator temporalJoinOperator) throws Exception {
        return new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)temporalJoinOperator, (KeySelector)this.keySelector, (KeySelector)this.keySelector, this.keyType);
    }
}

