package org.apache.flink.table.runtime.operators.join;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.JoinCondition;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.join.Int2HashJoinOperatorTest;
import org.apache.flink.table.runtime.operators.join.String2HashJoinOperatorTest;
import org.apache.flink.table.runtime.operators.sort.StringNormalizedKeyComputer;
import org.apache.flink.table.runtime.operators.sort.StringRecordComparator;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.class */
public class String2SortMergeJoinOperatorTest {
    private boolean leftIsSmall;
    BaseRowTypeInfo typeInfo = new BaseRowTypeInfo(new LogicalType[]{new VarCharType(Integer.MAX_VALUE), new VarCharType(Integer.MAX_VALUE)});
    private BaseRowTypeInfo joinedInfo = new BaseRowTypeInfo(new LogicalType[]{new VarCharType(Integer.MAX_VALUE), new VarCharType(Integer.MAX_VALUE), new VarCharType(Integer.MAX_VALUE), new VarCharType(Integer.MAX_VALUE)});

    public String2SortMergeJoinOperatorTest(boolean z) {
        this.leftIsSmall = z;
    }

    @Parameterized.Parameters
    public static Collection<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    @Test
    public void testInnerJoin() throws Exception {
        TwoInputStreamTaskTestHarness<BinaryRow, BinaryRow, JoinedRow> buildSortMergeJoin = buildSortMergeJoin(newOperator(FlinkJoinType.INNER, this.leftIsSmall));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(String2HashJoinOperatorTest.newRow("a", "02")));
        concurrentLinkedQueue.add(new StreamRecord(String2HashJoinOperatorTest.newRow("b", "14")));
        buildSortMergeJoin.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, String2HashJoinOperatorTest.transformToBinary(buildSortMergeJoin.getOutput()));
    }

    @Test
    public void testLeftOuterJoin() throws Exception {
        TwoInputStreamTaskTestHarness<BinaryRow, BinaryRow, JoinedRow> buildSortMergeJoin = buildSortMergeJoin(newOperator(FlinkJoinType.LEFT, this.leftIsSmall));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(String2HashJoinOperatorTest.newRow("a", "02")));
        concurrentLinkedQueue.add(new StreamRecord(String2HashJoinOperatorTest.newRow("b", "14")));
        concurrentLinkedQueue.add(new StreamRecord(String2HashJoinOperatorTest.newRow("d", "0null")));
        buildSortMergeJoin.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, String2HashJoinOperatorTest.transformToBinary(buildSortMergeJoin.getOutput()));
    }

    @Test
    public void testRightOuterJoin() throws Exception {
        TwoInputStreamTaskTestHarness<BinaryRow, BinaryRow, JoinedRow> buildSortMergeJoin = buildSortMergeJoin(newOperator(FlinkJoinType.RIGHT, this.leftIsSmall));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(String2HashJoinOperatorTest.newRow("a", "02")));
        concurrentLinkedQueue.add(new StreamRecord(String2HashJoinOperatorTest.newRow("b", "14")));
        concurrentLinkedQueue.add(new StreamRecord(String2HashJoinOperatorTest.newRow("c", "2null")));
        buildSortMergeJoin.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, String2HashJoinOperatorTest.transformToBinary(buildSortMergeJoin.getOutput()));
    }

    @Test
    public void testFullJoin() throws Exception {
        TwoInputStreamTaskTestHarness<BinaryRow, BinaryRow, JoinedRow> buildSortMergeJoin = buildSortMergeJoin(newOperator(FlinkJoinType.FULL, this.leftIsSmall));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(String2HashJoinOperatorTest.newRow("a", "02")));
        concurrentLinkedQueue.add(new StreamRecord(String2HashJoinOperatorTest.newRow("b", "14")));
        concurrentLinkedQueue.add(new StreamRecord(String2HashJoinOperatorTest.newRow("c", "2null")));
        concurrentLinkedQueue.add(new StreamRecord(String2HashJoinOperatorTest.newRow("d", "0null")));
        buildSortMergeJoin.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, String2HashJoinOperatorTest.transformToBinary(buildSortMergeJoin.getOutput()));
    }

    private TwoInputStreamTaskTestHarness<BinaryRow, BinaryRow, JoinedRow> buildSortMergeJoin(StreamOperator streamOperator) throws Exception {
        TwoInputStreamTaskTestHarness<BinaryRow, BinaryRow, JoinedRow> twoInputStreamTaskTestHarness = new TwoInputStreamTaskTestHarness<>(TwoInputStreamTask::new, 2, 2, new int[]{1, 2}, this.typeInfo, this.typeInfo, this.joinedInfo);
        twoInputStreamTaskTestHarness.memorySize = 37748736L;
        twoInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        twoInputStreamTaskTestHarness.getStreamConfig().setStreamOperator(streamOperator);
        twoInputStreamTaskTestHarness.getStreamConfig().setOperatorID(new OperatorID());
        twoInputStreamTaskTestHarness.invoke();
        twoInputStreamTaskTestHarness.waitForTaskRunning();
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(String2HashJoinOperatorTest.newRow("a", "0"), 0L), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(String2HashJoinOperatorTest.newRow("d", "0"), 0L), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(String2HashJoinOperatorTest.newRow("a", "2"), 0L), 1, 1);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(String2HashJoinOperatorTest.newRow("b", "1"), 0L), 0, 1);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(String2HashJoinOperatorTest.newRow("c", "2"), 0L), 1, 1);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(String2HashJoinOperatorTest.newRow("b", "4"), 0L), 1, 0);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        twoInputStreamTaskTestHarness.endInput();
        return twoInputStreamTaskTestHarness;
    }

    static StreamOperator newOperator(FlinkJoinType flinkJoinType, boolean z) {
        return new SortMergeJoinOperator(1048576L, 1048576L, flinkJoinType, z, new GeneratedJoinCondition("", "", new Object[0]) { // from class: org.apache.flink.table.runtime.operators.join.String2SortMergeJoinOperatorTest.1
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public JoinCondition m30newInstance(ClassLoader classLoader) {
                return new Int2HashJoinOperatorTest.TrueCondition();
            }
        }, new GeneratedProjection("", "", new Object[0]) { // from class: org.apache.flink.table.runtime.operators.join.String2SortMergeJoinOperatorTest.2
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public Projection m31newInstance(ClassLoader classLoader) {
                return new String2HashJoinOperatorTest.MyProjection();
            }
        }, new GeneratedProjection("", "", new Object[0]) { // from class: org.apache.flink.table.runtime.operators.join.String2SortMergeJoinOperatorTest.3
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public Projection m32newInstance(ClassLoader classLoader) {
                return new String2HashJoinOperatorTest.MyProjection();
            }
        }, new GeneratedNormalizedKeyComputer("", "") { // from class: org.apache.flink.table.runtime.operators.join.String2SortMergeJoinOperatorTest.4
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public NormalizedKeyComputer m33newInstance(ClassLoader classLoader) {
                return new StringNormalizedKeyComputer();
            }
        }, new GeneratedRecordComparator("", "", new Object[0]) { // from class: org.apache.flink.table.runtime.operators.join.String2SortMergeJoinOperatorTest.5
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public RecordComparator m34newInstance(ClassLoader classLoader) {
                return new StringRecordComparator();
            }
        }, new GeneratedNormalizedKeyComputer("", "") { // from class: org.apache.flink.table.runtime.operators.join.String2SortMergeJoinOperatorTest.6
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public NormalizedKeyComputer m35newInstance(ClassLoader classLoader) {
                return new StringNormalizedKeyComputer();
            }
        }, new GeneratedRecordComparator("", "", new Object[0]) { // from class: org.apache.flink.table.runtime.operators.join.String2SortMergeJoinOperatorTest.7
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public RecordComparator m36newInstance(ClassLoader classLoader) {
                return new StringRecordComparator();
            }
        }, new GeneratedRecordComparator("", "", new Object[0]) { // from class: org.apache.flink.table.runtime.operators.join.String2SortMergeJoinOperatorTest.8
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public RecordComparator m37newInstance(ClassLoader classLoader) {
                return new StringRecordComparator();
            }
        }, new boolean[]{true});
    }
}
