package org.apache.flink.test.javaApiOperators;

import java.io.IOException;
import java.io.NotSerializableException;
import java.util.Iterator;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.RichFlatJoinFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/javaApiOperators/JoinITCase.class */
public class JoinITCase extends MultipleProgramsTestBase {

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/JoinITCase$CustT3Join.class */
    public static class CustT3Join implements JoinFunction<CollectionDataSets.CustomType, Tuple3<Integer, Long, String>, Tuple2<String, String>> {
        public Tuple2<String, String> join(CollectionDataSets.CustomType customType, Tuple3<Integer, Long, String> tuple3) {
            return new Tuple2<>(customType.myString, tuple3.f2);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/JoinITCase$KeySelector1.class */
    public static class KeySelector1 implements KeySelector<CollectionDataSets.CustomType, Integer> {
        public Integer getKey(CollectionDataSets.CustomType customType) {
            return Integer.valueOf(customType.myInt);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/JoinITCase$KeySelector2.class */
    public static class KeySelector2 implements KeySelector<CollectionDataSets.CustomType, Long> {
        public Long getKey(CollectionDataSets.CustomType customType) {
            return Long.valueOf(customType.myLong);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/JoinITCase$KeySelector3.class */
    public static class KeySelector3 implements KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
        private static final long serialVersionUID = 1;

        public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> tuple3) {
            return new Tuple2<>(tuple3.f0, tuple3.f1);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/JoinITCase$KeySelector4.class */
    public static class KeySelector4 implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>> {
        private static final long serialVersionUID = 1;

        public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> tuple5) {
            return new Tuple2<>(tuple5.f0, tuple5.f4);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/JoinITCase$KeySelector5.class */
    public static class KeySelector5 implements KeySelector<CollectionDataSets.CustomType, Integer> {
        public Integer getKey(CollectionDataSets.CustomType customType) {
            return Integer.valueOf(customType.myInt);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/JoinITCase$KeySelector6.class */
    public static class KeySelector6 implements KeySelector<CollectionDataSets.CustomType, Integer> {
        public Integer getKey(CollectionDataSets.CustomType customType) {
            return Integer.valueOf(customType.myInt);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/JoinITCase$LeftReturningJoin.class */
    public static class LeftReturningJoin implements JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> {
        public Tuple3<Integer, Long, String> join(Tuple3<Integer, Long, String> tuple3, Tuple5<Integer, Long, Integer, String, Long> tuple5) {
            return tuple3;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/JoinITCase$RightReturningJoin.class */
    public static class RightReturningJoin implements JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
        public Tuple5<Integer, Long, Integer, String, Long> join(Tuple3<Integer, Long, String> tuple3, Tuple5<Integer, Long, Integer, String, Long> tuple5) {
            return tuple5;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/JoinITCase$T3CustJoin.class */
    public static class T3CustJoin implements JoinFunction<Tuple3<Integer, Long, String>, CollectionDataSets.CustomType, Tuple2<String, String>> {
        public Tuple2<String, String> join(Tuple3<Integer, Long, String> tuple3, CollectionDataSets.CustomType customType) {
            return new Tuple2<>(tuple3.f2, customType.myString);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/JoinITCase$T3T5BCJoin.class */
    public static class T3T5BCJoin extends RichFlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<String, String, Integer>> {
        private int broadcast;

        public void open(Configuration configuration) {
            int i = 0;
            Iterator it = getRuntimeContext().getBroadcastVariable("ints").iterator();
            while (it.hasNext()) {
                i += ((Integer) it.next()).intValue();
            }
            this.broadcast = i;
        }

        public void join(Tuple3<Integer, Long, String> tuple3, Tuple5<Integer, Long, Integer, String, Long> tuple5, Collector<Tuple3<String, String, Integer>> collector) throws Exception {
            collector.collect(new Tuple3(tuple3.f2, tuple5.f3, Integer.valueOf(this.broadcast)));
        }

        public /* bridge */ /* synthetic */ void join(Object obj, Object obj2, Collector collector) throws Exception {
            join((Tuple3<Integer, Long, String>) obj, (Tuple5<Integer, Long, Integer, String, Long>) obj2, (Collector<Tuple3<String, String, Integer>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/JoinITCase$T3T5FlatJoin.class */
    public static class T3T5FlatJoin implements FlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<String, String>> {
        public void join(Tuple3<Integer, Long, String> tuple3, Tuple5<Integer, Long, Integer, String, Long> tuple5, Collector<Tuple2<String, String>> collector) {
            collector.collect(new Tuple2(tuple3.f2, tuple5.f3));
        }

        public /* bridge */ /* synthetic */ void join(Object obj, Object obj2, Collector collector) throws Exception {
            join((Tuple3<Integer, Long, String>) obj, (Tuple5<Integer, Long, Integer, String, Long>) obj2, (Collector<Tuple2<String, String>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/JoinITCase$TestDistribution.class */
    public static class TestDistribution implements DataDistribution {
        public Object[][] boundaries = {new Object[]{2, 2L}, new Object[]{5, 4L}, new Object[]{10, 12L}, new Object[]{21, 6L}};

        public Object[] getBucketBoundary(int i, int i2) {
            return this.boundaries[i];
        }

        public int getNumberOfFields() {
            return 2;
        }

        public TypeInformation[] getKeyTypes() {
            return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO};
        }

        public void write(DataOutputView dataOutputView) throws IOException {
        }

        public void read(DataInputView dataInputView) throws IOException {
        }

        public boolean equals(Object obj) {
            return obj instanceof TestDistribution;
        }
    }

    public JoinITCase(MultipleProgramsTestBase.TestExecutionMode testExecutionMode) {
        super(testExecutionMode);
    }

    @Test
    public void testUDFJoinOnTuplesWithKeyFieldPositions() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        compareResultAsTuples(CollectionDataSets.getSmall3TupleDataSet(executionEnvironment).join(CollectionDataSets.get5TupleDataSet(executionEnvironment)).where(new int[]{1}).equalTo(new int[]{1}).with(new T3T5FlatJoin()).collect(), "Hi,Hallo\nHello,Hallo Welt\nHello world,Hallo Welt\n");
    }

    @Test
    public void testeUDFJoinOnTuplesWithMultipleKeyFieldPositions() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        compareResultAsTuples(CollectionDataSets.get3TupleDataSet(executionEnvironment).join(CollectionDataSets.get5TupleDataSet(executionEnvironment)).where(new int[]{0, 1}).equalTo(new int[]{0, 4}).with(new T3T5FlatJoin()).collect(), "Hi,Hallo\nHello,Hallo Welt\nHello world,Hallo Welt wie gehts?\nHello world,ABC\nI am fine.,HIJ\nI am fine.,IJK\n");
    }

    @Test
    public void testDefaultJoinOnTuples() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        compareResultAsTuples(CollectionDataSets.getSmall3TupleDataSet(executionEnvironment).join(CollectionDataSets.get5TupleDataSet(executionEnvironment)).where(new int[]{0}).equalTo(new int[]{2}).collect(), "(1,1,Hi),(2,2,1,Hallo Welt,2)\n(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n(3,2,Hello world),(3,4,3,Hallo Welt wie gehts?,2)\n");
    }

    @Test
    public void testJoinWithHuge() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        compareResultAsTuples(CollectionDataSets.getSmall3TupleDataSet(executionEnvironment).joinWithHuge(CollectionDataSets.get5TupleDataSet(executionEnvironment)).where(new int[]{1}).equalTo(new int[]{1}).with(new T3T5FlatJoin()).collect(), "Hi,Hallo\nHello,Hallo Welt\nHello world,Hallo Welt\n");
    }

    @Test
    public void testJoinWithTiny() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        compareResultAsTuples(CollectionDataSets.getSmall3TupleDataSet(executionEnvironment).joinWithTiny(CollectionDataSets.get5TupleDataSet(executionEnvironment)).where(new int[]{1}).equalTo(new int[]{1}).with(new T3T5FlatJoin()).collect(), "Hi,Hallo\nHello,Hallo Welt\nHello world,Hallo Welt\n");
    }

    @Test
    public void testJoinThatReturnsTheLeftInputObject() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        compareResultAsTuples(CollectionDataSets.getSmall3TupleDataSet(executionEnvironment).join(CollectionDataSets.get5TupleDataSet(executionEnvironment)).where(new int[]{1}).equalTo(new int[]{1}).with(new LeftReturningJoin()).collect(), "1,1,Hi\n2,2,Hello\n3,2,Hello world\n");
    }

    @Test
    public void testJoinThatReturnsTheRightInputObject() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        compareResultAsTuples(CollectionDataSets.getSmall3TupleDataSet(executionEnvironment).join(CollectionDataSets.get5TupleDataSet(executionEnvironment)).where(new int[]{1}).equalTo(new int[]{1}).with(new RightReturningJoin()).collect(), "1,1,0,Hallo,1\n2,2,1,Hallo Welt,2\n2,2,1,Hallo Welt,2\n");
    }

    @Test
    public void testJoinWithBroadcastSet() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        compareResultAsTuples(CollectionDataSets.get3TupleDataSet(executionEnvironment).join(CollectionDataSets.getSmall5TupleDataSet(executionEnvironment)).where(new int[]{1}).equalTo(new int[]{4}).with(new T3T5BCJoin()).withBroadcastSet(CollectionDataSets.getIntegerDataSet(executionEnvironment), "ints").collect(), "Hi,Hallo,55\nHi,Hallo Welt wie,55\nHello,Hallo Welt,55\nHello world,Hallo Welt,55\n");
    }

    @Test
    public void testJoinOnACustomTypeInputWithKeyExtractorAndATupleInputWithKeyFieldSelector() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        compareResultAsTuples(CollectionDataSets.getSmallCustomTypeDataSet(executionEnvironment).join(CollectionDataSets.get3TupleDataSet(executionEnvironment)).where(new KeySelector1()).equalTo(new int[]{0}).with(new CustT3Join()).collect(), "Hi,Hi\nHello,Hello\nHello world,Hello\n");
    }

    @Test
    public void testProjectOnATuple1Input() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        compareResultAsTuples(CollectionDataSets.getSmall3TupleDataSet(executionEnvironment).join(CollectionDataSets.get5TupleDataSet(executionEnvironment)).where(new int[]{1}).equalTo(new int[]{1}).projectFirst(new int[]{2, 1}).projectSecond(new int[]{3}).projectFirst(new int[]{0}).projectSecond(new int[]{4, 1}).collect(), "Hi,1,Hallo,1,1,1\nHello,2,Hallo Welt,2,2,2\nHello world,2,Hallo Welt,3,2,2\n");
    }

    @Test
    public void testProjectJoinOnATuple2Input() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        compareResultAsTuples(CollectionDataSets.getSmall3TupleDataSet(executionEnvironment).join(CollectionDataSets.get5TupleDataSet(executionEnvironment)).where(new int[]{1}).equalTo(new int[]{1}).projectSecond(new int[]{3}).projectFirst(new int[]{2, 1}).projectSecond(new int[]{4, 1}).projectFirst(new int[]{0}).collect(), "Hallo,Hi,1,1,1,1\nHallo Welt,Hello,2,2,2,2\nHallo Welt,Hello world,2,2,2,3\n");
    }

    @Test
    public void testJoinOnATupleInputWithKeyFieldSelectorAndACustomTypeInputWithKeyExtractor() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        compareResultAsTuples(CollectionDataSets.getSmall3TupleDataSet(executionEnvironment).join(CollectionDataSets.getCustomTypeDataSet(executionEnvironment)).where(new int[]{1}).equalTo(new KeySelector2()).with(new T3CustJoin()).collect(), "Hi,Hello\nHello,Hello world\nHello world,Hello world\n");
    }

    @Test
    public void testDefaultJoinOnTwoCustomTypeInputsWithKeyExtractors() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        compareResultAsTuples(CollectionDataSets.getCustomTypeDataSet(executionEnvironment).join(CollectionDataSets.getSmallCustomTypeDataSet(executionEnvironment)).where(new KeySelector5()).equalTo(new KeySelector6()).collect(), "1,0,Hi,1,0,Hi\n2,1,Hello,2,1,Hello\n2,1,Hello,2,2,Hello world\n2,2,Hello world,2,1,Hello\n2,2,Hello world,2,2,Hello world\n");
    }

    @Test
    public void testDefaultJoinOnTwoCustomTypeInputsWithInnerClassKeyExtractorsClosureCleaner() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        compareResultAsTuples(CollectionDataSets.getCustomTypeDataSet(executionEnvironment).join(CollectionDataSets.getSmallCustomTypeDataSet(executionEnvironment)).where(new KeySelector<CollectionDataSets.CustomType, Integer>() { // from class: org.apache.flink.test.javaApiOperators.JoinITCase.2
            public Integer getKey(CollectionDataSets.CustomType customType) {
                return Integer.valueOf(customType.myInt);
            }
        }).equalTo(new KeySelector<CollectionDataSets.CustomType, Integer>() { // from class: org.apache.flink.test.javaApiOperators.JoinITCase.1
            public Integer getKey(CollectionDataSets.CustomType customType) throws Exception {
                return Integer.valueOf(customType.myInt);
            }
        }).collect(), "1,0,Hi,1,0,Hi\n2,1,Hello,2,1,Hello\n2,1,Hello,2,2,Hello world\n2,2,Hello world,2,1,Hello\n2,2,Hello world,2,2,Hello world\n");
    }

    @Test
    public void testDefaultJoinOnTwoCustomTypeInputsWithInnerClassKeyExtractorsDisabledClosureCleaner() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableClosureCleaner();
        boolean z = false;
        try {
            CollectionDataSets.getCustomTypeDataSet(executionEnvironment).join(CollectionDataSets.getSmallCustomTypeDataSet(executionEnvironment)).where(new KeySelector<CollectionDataSets.CustomType, Integer>() { // from class: org.apache.flink.test.javaApiOperators.JoinITCase.4
                public Integer getKey(CollectionDataSets.CustomType customType) {
                    return Integer.valueOf(customType.myInt);
                }
            }).equalTo(new KeySelector<CollectionDataSets.CustomType, Integer>() { // from class: org.apache.flink.test.javaApiOperators.JoinITCase.3
                public Integer getKey(CollectionDataSets.CustomType customType) throws Exception {
                    return Integer.valueOf(customType.myInt);
                }
            });
        } catch (InvalidProgramException e) {
            z = e.getCause() instanceof NotSerializableException;
        }
        Assert.assertTrue(z);
    }

    @Test
    public void testUDFJoinOnTuplesWithTupleReturningKeySelectors() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        compareResultAsTuples(CollectionDataSets.get3TupleDataSet(executionEnvironment).join(CollectionDataSets.get5TupleDataSet(executionEnvironment)).where(new KeySelector3()).equalTo(new KeySelector4()).with(new T3T5FlatJoin()).collect(), "Hi,Hallo\nHello,Hallo Welt\nHello world,Hallo Welt wie gehts?\nHello world,ABC\nI am fine.,HIJ\nI am fine.,IJK\n");
    }

    @Test
    public void testJoinNestedPojoAgainstTupleSelectedUsingString() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        compareResultAsTuples(CollectionDataSets.getSmallPojoDataSet(executionEnvironment).join(CollectionDataSets.getSmallTuplebasedDataSet(executionEnvironment)).where(new String[]{"nestedPojo.longNumber"}).equalTo(new String[]{"f6"}).collect(), "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n");
    }

    @Test
    public void testJoinNestedPojoAgainstTupleSelectedUsingInteger() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        compareResultAsTuples(CollectionDataSets.getSmallPojoDataSet(executionEnvironment).join(CollectionDataSets.getSmallTuplebasedDataSet(executionEnvironment)).where(new String[]{"nestedPojo.longNumber"}).equalTo(new int[]{6}).collect(), "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n");
    }

    @Test
    public void testSelectingMultipleFieldsUsingExpressionLanguage() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        JoinOperator.DefaultJoin equalTo = CollectionDataSets.getSmallPojoDataSet(executionEnvironment).join(CollectionDataSets.getSmallTuplebasedDataSet(executionEnvironment)).where(new String[]{"nestedPojo.longNumber", "number", "str"}).equalTo(new String[]{"f6", "f0", "f1"});
        executionEnvironment.setParallelism(1);
        compareResultAsTuples(equalTo.collect(), "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n");
    }

    @Test
    public void testNestedIntoTuple() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        JoinOperator.DefaultJoin equalTo = CollectionDataSets.getSmallPojoDataSet(executionEnvironment).join(CollectionDataSets.getSmallTuplebasedDataSet(executionEnvironment)).where(new String[]{"nestedPojo.longNumber", "number", "nestedTupleWithCustom.f0"}).equalTo(new String[]{"f6", "f0", "f2"});
        executionEnvironment.setParallelism(1);
        compareResultAsTuples(equalTo.collect(), "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n");
    }

    @Test
    public void testNestedIntoTupleIntoPojo() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        JoinOperator.DefaultJoin equalTo = CollectionDataSets.getSmallPojoDataSet(executionEnvironment).join(CollectionDataSets.getSmallTuplebasedDataSet(executionEnvironment)).where(new String[]{"nestedTupleWithCustom.f0", "nestedTupleWithCustom.f1.myInt", "nestedTupleWithCustom.f1.myLong"}).equalTo(new String[]{"f2", "f3", "f4"});
        executionEnvironment.setParallelism(1);
        compareResultAsTuples(equalTo.collect(), "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n");
    }

    @Test
    public void testNonPojoToVerifyFullTupleKeys() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        JoinOperator.DefaultJoin equalTo = CollectionDataSets.getSmallNestedTupleDataSet(executionEnvironment).join(CollectionDataSets.getSmallNestedTupleDataSet(executionEnvironment)).where(new int[]{0}).equalTo(new String[]{"f0.f0", "f0.f1"});
        executionEnvironment.setParallelism(1);
        compareResultAsTuples(equalTo.collect(), "((1,1),one),((1,1),one)\n((2,2),two),((2,2),two)\n((3,3),three),((3,3),three)\n");
    }

    @Test
    public void testNonPojoToVerifyNestedTupleElementSelection() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        JoinOperator.DefaultJoin equalTo = CollectionDataSets.getSmallNestedTupleDataSet(executionEnvironment).join(CollectionDataSets.getSmallNestedTupleDataSet(executionEnvironment)).where(new String[]{"f0.f0"}).equalTo(new String[]{"f0.f0"});
        executionEnvironment.setParallelism(1);
        compareResultAsTuples(equalTo.collect(), "((1,1),one),((1,1),one)\n((2,2),two),((2,2),two)\n((3,3),three),((3,3),three)\n");
    }

    @Test
    public void testFullPojoWithFullTuple() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        JoinOperator.DefaultJoin equalTo = CollectionDataSets.getSmallPojoDataSet(executionEnvironment).join(CollectionDataSets.getSmallTuplebasedDataSetMatchingPojo(executionEnvironment)).where(new String[]{"*"}).equalTo(new String[]{"*"});
        executionEnvironment.setParallelism(1);
        compareResultAsTuples(equalTo.collect(), "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n2 Second (20,200,2000,Two) 20000,(20000,20,200,2000,Two,2,Second)\n3 Third (30,300,3000,Three) 30000,(30000,30,300,3000,Three,3,Third)\n");
    }

    @Test
    public void testNonPojoToVerifyNestedTupleElementSelectionWithFirstKeyFieldGreaterThanZero() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> small3TupleDataSet = CollectionDataSets.getSmall3TupleDataSet(executionEnvironment);
        JoinOperator.DefaultJoin equalTo = small3TupleDataSet.join(small3TupleDataSet).where(new int[]{0}).equalTo(new int[]{0});
        JoinOperator.DefaultJoin equalTo2 = equalTo.join(equalTo).where(new String[]{"f1.f0"}).equalTo(new String[]{"f0.f0"});
        executionEnvironment.setParallelism(1);
        compareResultAsTuples(equalTo2.collect(), "((1,1,Hi),(1,1,Hi)),((1,1,Hi),(1,1,Hi))\n((2,2,Hello),(2,2,Hello)),((2,2,Hello),(2,2,Hello))\n((3,2,Hello world),(3,2,Hello world)),((3,2,Hello world),(3,2,Hello world))\n");
    }

    @Test
    public void testJoinWithAtomicType1() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        compareResultAsTuples(CollectionDataSets.getSmall3TupleDataSet(executionEnvironment).join(executionEnvironment.fromElements(new Integer[]{1, 2})).where(new int[]{0}).equalTo(new String[]{"*"}).collect(), "(1,1,Hi),1\n(2,2,Hello),2");
    }

    @Test
    public void testJoinWithAtomicType2() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        compareResultAsTuples(executionEnvironment.fromElements(new Integer[]{1, 2}).join(CollectionDataSets.getSmall3TupleDataSet(executionEnvironment)).where(new String[]{"*"}).equalTo(new int[]{0}).collect(), "1,(1,1,Hi)\n2,(2,2,Hello)");
    }

    @Test
    public void testJoinWithRangePartitioning() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> dataSet = CollectionDataSets.get3TupleDataSet(executionEnvironment);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> dataSet2 = CollectionDataSets.get5TupleDataSet(executionEnvironment);
        executionEnvironment.setParallelism(4);
        TestDistribution testDistribution = new TestDistribution();
        compareResultAsTuples(DataSetUtils.partitionByRange(dataSet, testDistribution, new int[]{0, 1}).join(DataSetUtils.partitionByRange(dataSet2, testDistribution, new int[]{0, 4})).where(new int[]{0, 1}).equalTo(new int[]{0, 4}).with(new T3T5FlatJoin()).collect(), "Hi,Hallo\nHello,Hallo Welt\nHello world,Hallo Welt wie gehts?\nHello world,ABC\nI am fine.,HIJ\nI am fine.,IJK\n");
    }
}
