/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.operators;

import java.io.IOException;
import java.io.NotSerializableException;
import java.util.List;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFlatJoinFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.TwoInputUdfOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBaseJUnit4;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class JoinITCase
extends MultipleProgramsTestBaseJUnit4 {
    public JoinITCase(MultipleProgramsTestBaseJUnit4.TestExecutionMode mode) {
        super(mode);
    }

    @Test
    public void testUDFJoinOnTuplesWithKeyFieldPositions() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
        JoinOperator.EquiJoin joinDs = ds1.join(ds2).where(new int[]{1}).equalTo(new int[]{1}).with((FlatJoinFunction)new T3T5FlatJoin());
        List result = joinDs.collect();
        String expected = "Hi,Hallo\nHello,Hallo Welt\nHello world,Hallo Welt\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testeUDFJoinOnTuplesWithMultipleKeyFieldPositions() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
        JoinOperator.EquiJoin joinDs = ds1.join(ds2).where(new int[]{0, 1}).equalTo(new int[]{0, 4}).with((FlatJoinFunction)new T3T5FlatJoin());
        List result = joinDs.collect();
        String expected = "Hi,Hallo\nHello,Hallo Welt\nHello world,Hallo Welt wie gehts?\nHello world,ABC\nI am fine.,HIJ\nI am fine.,IJK\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testDefaultJoinOnTuples() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
        JoinOperator.DefaultJoin joinDs = ds1.join(ds2).where(new int[]{0}).equalTo(new int[]{2});
        List result = joinDs.collect();
        String expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n(3,2,Hello world),(3,4,3,Hallo Welt wie gehts?,2)\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testJoinWithHuge() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
        JoinOperator.EquiJoin joinDs = ds1.joinWithHuge(ds2).where(new int[]{1}).equalTo(new int[]{1}).with((FlatJoinFunction)new T3T5FlatJoin());
        List result = joinDs.collect();
        String expected = "Hi,Hallo\nHello,Hallo Welt\nHello world,Hallo Welt\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testJoinWithTiny() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
        JoinOperator.EquiJoin joinDs = ds1.joinWithTiny(ds2).where(new int[]{1}).equalTo(new int[]{1}).with((FlatJoinFunction)new T3T5FlatJoin());
        List result = joinDs.collect();
        String expected = "Hi,Hallo\nHello,Hallo Welt\nHello world,Hallo Welt\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testJoinThatReturnsTheLeftInputObject() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
        JoinOperator.EquiJoin joinDs = ds1.join(ds2).where(new int[]{1}).equalTo(new int[]{1}).with((JoinFunction)new LeftReturningJoin());
        List result = joinDs.collect();
        String expected = "1,1,Hi\n2,2,Hello\n3,2,Hello world\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testJoinThatReturnsTheRightInputObject() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
        JoinOperator.EquiJoin joinDs = ds1.join(ds2).where(new int[]{1}).equalTo(new int[]{1}).with((JoinFunction)new RightReturningJoin());
        List result = joinDs.collect();
        String expected = "1,1,0,Hallo,1\n2,2,1,Hallo Welt,2\n2,2,1,Hallo Welt,2\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testJoinWithBroadcastSet() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
        DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
        TwoInputUdfOperator joinDs = ds1.join(ds2).where(new int[]{1}).equalTo(new int[]{4}).with((FlatJoinFunction)new T3T5BCJoin()).withBroadcastSet(intDs, "ints");
        List result = joinDs.collect();
        String expected = "Hi,Hallo,55\nHi,Hallo Welt wie,55\nHello,Hallo Welt,55\nHello world,Hallo Welt,55\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testJoinOnACustomTypeInputWithKeyExtractorAndATupleInputWithKeyFieldSelector() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.CustomType> ds1 = CollectionDataSets.getSmallCustomTypeDataSet(env);
        DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
        JoinOperator.EquiJoin joinDs = ds1.join(ds2).where((KeySelector)new KeySelector1()).equalTo(new int[]{0}).with((JoinFunction)new CustT3Join());
        List result = joinDs.collect();
        String expected = "Hi,Hi\nHello,Hello\nHello world,Hello\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testProjectOnATuple1Input() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
        JoinOperator.ProjectJoin joinDs = ds1.join(ds2).where(new int[]{1}).equalTo(new int[]{1}).projectFirst(new int[]{2, 1}).projectSecond(new int[]{3}).projectFirst(new int[]{0}).projectSecond(new int[]{4, 1});
        List result = joinDs.collect();
        String expected = "Hi,1,Hallo,1,1,1\nHello,2,Hallo Welt,2,2,2\nHello world,2,Hallo Welt,3,2,2\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testProjectJoinOnATuple2Input() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
        JoinOperator.ProjectJoin joinDs = ds1.join(ds2).where(new int[]{1}).equalTo(new int[]{1}).projectSecond(new int[]{3}).projectFirst(new int[]{2, 1}).projectSecond(new int[]{4, 1}).projectFirst(new int[]{0});
        List result = joinDs.collect();
        String expected = "Hallo,Hi,1,1,1,1\nHallo Welt,Hello,2,2,2,2\nHallo Welt,Hello world,2,2,2,3\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testJoinOnATupleInputWithKeyFieldSelectorAndACustomTypeInputWithKeyExtractor() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
        DataSet<CollectionDataSets.CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
        JoinOperator.EquiJoin joinDs = ds1.join(ds2).where(new int[]{1}).equalTo((KeySelector)new KeySelector2()).with((JoinFunction)new T3CustJoin());
        List result = joinDs.collect();
        String expected = "Hi,Hello\nHello,Hello world\nHello world,Hello world\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testDefaultJoinOnTwoCustomTypeInputsWithKeyExtractors() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.CustomType> ds1 = CollectionDataSets.getCustomTypeDataSet(env);
        DataSet<CollectionDataSets.CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
        JoinOperator.DefaultJoin joinDs = ds1.join(ds2).where((KeySelector)new KeySelector5()).equalTo((KeySelector)new KeySelector6());
        List result = joinDs.collect();
        String expected = "1,0,Hi,1,0,Hi\n2,1,Hello,2,1,Hello\n2,1,Hello,2,2,Hello world\n2,2,Hello world,2,1,Hello\n2,2,Hello world,2,2,Hello world\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testDefaultJoinOnTwoCustomTypeInputsWithInnerClassKeyExtractorsClosureCleaner() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.CustomType> ds1 = CollectionDataSets.getCustomTypeDataSet(env);
        DataSet<CollectionDataSets.CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
        JoinOperator.DefaultJoin joinDs = ds1.join(ds2).where((KeySelector)new KeySelector<CollectionDataSets.CustomType, Integer>(){

            public Integer getKey(CollectionDataSets.CustomType value) {
                return value.myInt;
            }
        }).equalTo((KeySelector)new KeySelector<CollectionDataSets.CustomType, Integer>(){

            public Integer getKey(CollectionDataSets.CustomType value) throws Exception {
                return value.myInt;
            }
        });
        List result = joinDs.collect();
        String expected = "1,0,Hi,1,0,Hi\n2,1,Hello,2,1,Hello\n2,1,Hello,2,2,Hello world\n2,2,Hello world,2,1,Hello\n2,2,Hello world,2,2,Hello world\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testDefaultJoinOnTwoCustomTypeInputsWithInnerClassKeyExtractorsDisabledClosureCleaner() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableClosureCleaner();
        DataSet<CollectionDataSets.CustomType> ds1 = CollectionDataSets.getCustomTypeDataSet(env);
        DataSet<CollectionDataSets.CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
        boolean correctExceptionTriggered = false;
        try {
            JoinOperator.DefaultJoin defaultJoin = ds1.join(ds2).where((KeySelector)new KeySelector<CollectionDataSets.CustomType, Integer>(){

                public Integer getKey(CollectionDataSets.CustomType value) {
                    return value.myInt;
                }
            }).equalTo((KeySelector)new KeySelector<CollectionDataSets.CustomType, Integer>(){

                public Integer getKey(CollectionDataSets.CustomType value) throws Exception {
                    return value.myInt;
                }
            });
        }
        catch (InvalidProgramException ex) {
            correctExceptionTriggered = ex.getCause() instanceof NotSerializableException;
        }
        Assert.assertTrue((boolean)correctExceptionTriggered);
    }

    @Test
    public void testUDFJoinOnTuplesWithTupleReturningKeySelectors() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
        JoinOperator.EquiJoin joinDs = ds1.join(ds2).where((KeySelector)new KeySelector3()).equalTo((KeySelector)new KeySelector4()).with((FlatJoinFunction)new T3T5FlatJoin());
        List result = joinDs.collect();
        String expected = "Hi,Hallo\nHello,Hallo Welt\nHello world,Hallo Welt wie gehts?\nHello world,ABC\nI am fine.,HIJ\nI am fine.,IJK\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testJoinNestedPojoAgainstTupleSelectedUsingString() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
        DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
        JoinOperator.DefaultJoin joinDs = ds1.join(ds2).where(new String[]{"nestedPojo.longNumber"}).equalTo(new String[]{"f6"});
        List result = joinDs.collect();
        String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testJoinNestedPojoAgainstTupleSelectedUsingInteger() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
        DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
        JoinOperator.DefaultJoin joinDs = ds1.join(ds2).where(new String[]{"nestedPojo.longNumber"}).equalTo(new int[]{6});
        List result = joinDs.collect();
        String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testSelectingMultipleFieldsUsingExpressionLanguage() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
        DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
        JoinOperator.DefaultJoin joinDs = ds1.join(ds2).where(new String[]{"nestedPojo.longNumber", "number", "str"}).equalTo(new String[]{"f6", "f0", "f1"});
        env.setParallelism(1);
        List result = joinDs.collect();
        String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testNestedIntoTuple() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
        DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
        JoinOperator.DefaultJoin joinDs = ds1.join(ds2).where(new String[]{"nestedPojo.longNumber", "number", "nestedTupleWithCustom.f0"}).equalTo(new String[]{"f6", "f0", "f2"});
        env.setParallelism(1);
        List result = joinDs.collect();
        String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testNestedIntoTupleIntoPojo() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
        DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
        JoinOperator.DefaultJoin joinDs = ds1.join(ds2).where(new String[]{"nestedTupleWithCustom.f0", "nestedTupleWithCustom.f1.myInt", "nestedTupleWithCustom.f1.myLong"}).equalTo(new String[]{"f2", "f3", "f4"});
        env.setParallelism(1);
        List result = joinDs.collect();
        String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testNonPojoToVerifyFullTupleKeys() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env);
        DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env);
        JoinOperator.DefaultJoin joinDs = ds1.join(ds2).where(new int[]{0}).equalTo(new String[]{"f0.f0", "f0.f1"});
        env.setParallelism(1);
        List result = joinDs.collect();
        String expected = "((1,1),one),((1,1),one)\n((2,2),two),((2,2),two)\n((3,3),three),((3,3),three)\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testNonPojoToVerifyNestedTupleElementSelection() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env);
        DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env);
        JoinOperator.DefaultJoin joinDs = ds1.join(ds2).where(new String[]{"f0.f0"}).equalTo(new String[]{"f0.f0"});
        env.setParallelism(1);
        List result = joinDs.collect();
        String expected = "((1,1),one),((1,1),one)\n((2,2),two),((2,2),two)\n((3,3),three),((3,3),three)\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testFullPojoWithFullTuple() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
        DataSet<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> ds2 = CollectionDataSets.getSmallTuplebasedDataSetMatchingPojo(env);
        JoinOperator.DefaultJoin joinDs = ds1.join(ds2).where(new String[]{"*"}).equalTo(new String[]{"*"});
        env.setParallelism(1);
        List result = joinDs.collect();
        String expected = "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n2 Second (20,200,2000,Two) 20000,(20000,20,200,2000,Two,2,Second)\n3 Third (30,300,3000,Three) 30000,(30000,30,300,3000,Three,3,Third)\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testNonPojoToVerifyNestedTupleElementSelectionWithFirstKeyFieldGreaterThanZero() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
        JoinOperator.DefaultJoin ds2 = ds1.join(ds1).where(new int[]{0}).equalTo(new int[]{0});
        JoinOperator.DefaultJoin joinDs = ds2.join((DataSet)ds2).where(new String[]{"f1.f0"}).equalTo(new String[]{"f0.f0"});
        env.setParallelism(1);
        List result = joinDs.collect();
        String expected = "((1,1,Hi),(1,1,Hi)),((1,1,Hi),(1,1,Hi))\n((2,2,Hello),(2,2,Hello)),((2,2,Hello),(2,2,Hello))\n((3,2,Hello world),(3,2,Hello world)),((3,2,Hello world),(3,2,Hello world))\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testJoinWithAtomicType1() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
        DataSource ds2 = env.fromElements((Object[])new Integer[]{1, 2});
        JoinOperator.DefaultJoin joinDs = ds1.join((DataSet)ds2).where(new int[]{0}).equalTo(new String[]{"*"});
        List result = joinDs.collect();
        String expected = "(1,1,Hi),1\n(2,2,Hello),2";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testJoinWithAtomicType2() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource ds1 = env.fromElements((Object[])new Integer[]{1, 2});
        DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
        JoinOperator.DefaultJoin joinDs = ds1.join(ds2).where(new String[]{"*"}).equalTo(new int[]{0});
        List result = joinDs.collect();
        String expected = "1,(1,1,Hi)\n2,(2,2,Hello)";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testJoinWithRangePartitioning() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
        env.setParallelism(4);
        TestDistribution testDis = new TestDistribution();
        JoinOperator.EquiJoin joinDs = DataSetUtils.partitionByRange(ds1, (DataDistribution)testDis, (int[])new int[]{0, 1}).join((DataSet)DataSetUtils.partitionByRange(ds2, (DataDistribution)testDis, (int[])new int[]{0, 4})).where(new int[]{0, 1}).equalTo(new int[]{0, 4}).with((FlatJoinFunction)new T3T5FlatJoin());
        List result = joinDs.collect();
        String expected = "Hi,Hallo\nHello,Hallo Welt\nHello world,Hallo Welt wie gehts?\nHello world,ABC\nI am fine.,HIJ\nI am fine.,IJK\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    public static class TestDistribution
    implements DataDistribution {
        public Object[][] boundaries = new Object[][]{{2, 2L}, {5, 4L}, {10, 12L}, {21, 6L}};

        public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
            return this.boundaries[bucketNum];
        }

        public int getNumberOfFields() {
            return 2;
        }

        public TypeInformation[] getKeyTypes() {
            return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO};
        }

        public void write(DataOutputView out) throws IOException {
        }

        public void read(DataInputView in) throws IOException {
        }

        public boolean equals(Object obj) {
            return obj instanceof TestDistribution;
        }
    }

    private static class CustT3Join
    implements JoinFunction<CollectionDataSets.CustomType, Tuple3<Integer, Long, String>, Tuple2<String, String>> {
        private CustT3Join() {
        }

        public Tuple2<String, String> join(CollectionDataSets.CustomType first, Tuple3<Integer, Long, String> second) {
            return new Tuple2((Object)first.myString, second.f2);
        }
    }

    private static class T3CustJoin
    implements JoinFunction<Tuple3<Integer, Long, String>, CollectionDataSets.CustomType, Tuple2<String, String>> {
        private T3CustJoin() {
        }

        public Tuple2<String, String> join(Tuple3<Integer, Long, String> first, CollectionDataSets.CustomType second) {
            return new Tuple2(first.f2, (Object)second.myString);
        }
    }

    private static class T3T5BCJoin
    extends RichFlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<String, String, Integer>> {
        private int broadcast;

        private T3T5BCJoin() {
        }

        public void open(OpenContext openContext) {
            List ints = this.getRuntimeContext().getBroadcastVariable("ints");
            int sum = 0;
            for (Integer i : ints) {
                sum += i.intValue();
            }
            this.broadcast = sum;
        }

        public void join(Tuple3<Integer, Long, String> first, Tuple5<Integer, Long, Integer, String, Long> second, Collector<Tuple3<String, String, Integer>> out) throws Exception {
            out.collect((Object)new Tuple3(first.f2, second.f3, (Object)this.broadcast));
        }
    }

    private static class RightReturningJoin
    implements JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
        private RightReturningJoin() {
        }

        public Tuple5<Integer, Long, Integer, String, Long> join(Tuple3<Integer, Long, String> first, Tuple5<Integer, Long, Integer, String, Long> second) {
            return second;
        }
    }

    private static class LeftReturningJoin
    implements JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> {
        private LeftReturningJoin() {
        }

        public Tuple3<Integer, Long, String> join(Tuple3<Integer, Long, String> first, Tuple5<Integer, Long, Integer, String, Long> second) {
            return first;
        }
    }

    private static class T3T5FlatJoin
    implements FlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<String, String>> {
        private T3T5FlatJoin() {
        }

        public void join(Tuple3<Integer, Long, String> first, Tuple5<Integer, Long, Integer, String, Long> second, Collector<Tuple2<String, String>> out) {
            out.collect((Object)new Tuple2(first.f2, second.f3));
        }
    }

    private static class KeySelector4
    implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>> {
        private static final long serialVersionUID = 1L;

        private KeySelector4() {
        }

        public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) {
            return new Tuple2(t.f0, t.f4);
        }
    }

    private static class KeySelector3
    implements KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
        private static final long serialVersionUID = 1L;

        private KeySelector3() {
        }

        public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> t) {
            return new Tuple2(t.f0, t.f1);
        }
    }

    private static class KeySelector6
    implements KeySelector<CollectionDataSets.CustomType, Integer> {
        private KeySelector6() {
        }

        public Integer getKey(CollectionDataSets.CustomType value) {
            return value.myInt;
        }
    }

    private static class KeySelector5
    implements KeySelector<CollectionDataSets.CustomType, Integer> {
        private KeySelector5() {
        }

        public Integer getKey(CollectionDataSets.CustomType value) {
            return value.myInt;
        }
    }

    private static class KeySelector2
    implements KeySelector<CollectionDataSets.CustomType, Long> {
        private KeySelector2() {
        }

        public Long getKey(CollectionDataSets.CustomType value) {
            return value.myLong;
        }
    }

    private static class KeySelector1
    implements KeySelector<CollectionDataSets.CustomType, Integer> {
        private KeySelector1() {
        }

        public Integer getKey(CollectionDataSets.CustomType value) {
            return value.myInt;
        }
    }
}

