package org.apache.flink.table.runtime.operators.join.interval;

import java.util.ArrayList;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/interval/ProcTimeIntervalJoinTest.class */
public class ProcTimeIntervalJoinTest extends TimeIntervalStreamJoinTestBase {
    private int keyIdx = 0;
    private RowDataKeySelector keySelector = HandwrittenSelectorUtil.getRowDataSelector(new int[]{this.keyIdx}, this.rowType.toRowFieldTypes());
    private TypeInformation<RowData> keyType = InternalTypeInfo.ofFields(new LogicalType[0]);

    @Test
    public void testProcTimeInnerJoinWithCommonBounds() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness = createTestHarness(new ProcTimeIntervalJoin(FlinkJoinType.INNER, -10L, 20L, this.rowType, this.rowType, this.joinFunction));
        createTestHarness.open();
        createTestHarness.setProcessingTime(1L);
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(1L, "1a1"));
        Assertions.assertThat(createTestHarness.numProcessingTimeTimers()).isEqualTo(1);
        createTestHarness.setProcessingTime(2L);
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(2L, "2a2"));
        Assertions.assertThat(createTestHarness.numProcessingTimeTimers()).isEqualTo(2);
        createTestHarness.setProcessingTime(3L);
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(1L, "1a3"));
        Assertions.assertThat(createTestHarness.numKeyedStateEntries()).isEqualTo(4);
        Assertions.assertThat(createTestHarness.numProcessingTimeTimers()).isEqualTo(2);
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(1L, "1b3"));
        createTestHarness.setProcessingTime(4L);
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(2L, "2b4"));
        Assertions.assertThat(createTestHarness.numKeyedStateEntries()).isEqualTo(8);
        Assertions.assertThat(createTestHarness.numProcessingTimeTimers()).isEqualTo(4);
        createTestHarness.setProcessingTime(13L);
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(1L, "1b13"));
        createTestHarness.setProcessingTime(33L);
        Assertions.assertThat(createTestHarness.numKeyedStateEntries()).isEqualTo(4);
        Assertions.assertThat(createTestHarness.numProcessingTimeTimers()).isEqualTo(2);
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(1L, "1a33"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(2L, "2a33"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(2L, "2b33"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(1L, "1a1", 1L, "1b3"));
        arrayList.add(StreamRecordUtils.insertRecord(1L, "1a3", 1L, "1b3"));
        arrayList.add(StreamRecordUtils.insertRecord(2L, "2a2", 2L, "2b4"));
        arrayList.add(StreamRecordUtils.insertRecord(1L, "1a3", 1L, "1b13"));
        arrayList.add(StreamRecordUtils.insertRecord(1L, "1a33", 1L, "1b13"));
        arrayList.add(StreamRecordUtils.insertRecord(2L, "2a33", 2L, "2b33"));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.close();
    }

    @Test
    public void testProcTimeInnerJoinWithNegativeBounds() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness = createTestHarness(new ProcTimeIntervalJoin(FlinkJoinType.INNER, -10L, -5L, this.rowType, this.rowType, this.joinFunction));
        createTestHarness.open();
        createTestHarness.setProcessingTime(1L);
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(1L, "1a1"));
        createTestHarness.setProcessingTime(2L);
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(2L, "2a2"));
        createTestHarness.setProcessingTime(3L);
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(1L, "1a3"));
        Assertions.assertThat(createTestHarness.numKeyedStateEntries()).isEqualTo(4);
        Assertions.assertThat(createTestHarness.numProcessingTimeTimers()).isEqualTo(2);
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(1L, "1b3"));
        Assertions.assertThat(createTestHarness.numKeyedStateEntries()).isEqualTo(4);
        Assertions.assertThat(createTestHarness.numProcessingTimeTimers()).isEqualTo(2);
        createTestHarness.setProcessingTime(7L);
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(2L, "2b7"));
        Assertions.assertThat(createTestHarness.numKeyedStateEntries()).isEqualTo(4);
        Assertions.assertThat(createTestHarness.numProcessingTimeTimers()).isEqualTo(2);
        createTestHarness.setProcessingTime(12L);
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(1L, "1b12"));
        createTestHarness.setProcessingTime(13L);
        Assertions.assertThat(createTestHarness.numKeyedStateEntries()).isEqualTo(4);
        Assertions.assertThat(createTestHarness.numProcessingTimeTimers()).isEqualTo(2);
        createTestHarness.setProcessingTime(14L);
        Assertions.assertThat(createTestHarness.numKeyedStateEntries()).isEqualTo(2);
        Assertions.assertThat(createTestHarness.numProcessingTimeTimers()).isEqualTo(1);
        createTestHarness.setProcessingTime(16L);
        Assertions.assertThat(createTestHarness.numKeyedStateEntries()).isEqualTo(0);
        Assertions.assertThat(createTestHarness.numProcessingTimeTimers()).isEqualTo(0);
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(2L, "2a2", 2L, "2b7"));
        arrayList.add(StreamRecordUtils.insertRecord(1L, "1a3", 1L, "1b12"));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.close();
    }

    private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness(ProcTimeIntervalJoin procTimeIntervalJoin) throws Exception {
        return new KeyedTwoInputStreamOperatorTestHarness<>(new KeyedCoProcessOperator(procTimeIntervalJoin), this.keySelector, this.keySelector, this.keyType);
    }
}
