/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.python.api;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Random;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.io.PrintingOutputFormat;
import org.apache.flink.api.java.io.TupleCsvInputFormat;
import org.apache.flink.api.java.operators.CoGroupRawOperator;
import org.apache.flink.api.java.operators.CrossOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.Grouping;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.SortedGrouping;
import org.apache.flink.api.java.operators.UdfOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.python.api.PythonOperationInfo;
import org.apache.flink.python.api.functions.PythonCoGroup;
import org.apache.flink.python.api.functions.PythonMapPartition;
import org.apache.flink.python.api.functions.util.IdentityGroupReduce;
import org.apache.flink.python.api.functions.util.KeyDiscarder;
import org.apache.flink.python.api.functions.util.NestedKeyDiscarder;
import org.apache.flink.python.api.functions.util.SerializerMap;
import org.apache.flink.python.api.functions.util.StringDeserializerMap;
import org.apache.flink.python.api.functions.util.StringTupleDeserializerMap;
import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
import org.apache.flink.runtime.filecache.FileCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PythonPlanBinder {
    static final Logger LOG = LoggerFactory.getLogger(PythonPlanBinder.class);
    public static final String ARGUMENT_PYTHON_2 = "2";
    public static final String ARGUMENT_PYTHON_3 = "3";
    public static final String FLINK_PYTHON_DC_ID = "flink";
    public static final String FLINK_PYTHON_PLAN_NAME = File.separator + "plan.py";
    public static final String FLINK_PYTHON2_BINARY_KEY = "python.binary.python2";
    public static final String FLINK_PYTHON3_BINARY_KEY = "python.binary.python3";
    public static final String PLANBINDER_CONFIG_BCVAR_COUNT = "PLANBINDER_BCVAR_COUNT";
    public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_";
    public static String FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString((String)"python.binary.python2", (String)"python");
    public static String FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString((String)"python.binary.python3", (String)"python3");
    private static final Random r = new Random();
    public static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
    private static final String FLINK_PYTHON_REL_LOCAL_PATH = File.separator + "resources" + File.separator + "python";
    private static final String FLINK_DIR = System.getenv("FLINK_ROOT_DIR");
    private static String FULL_PATH;
    public static StringBuilder arguments;
    public static boolean usePython3;
    private static String FLINK_HDFS_PATH;
    public static final String FLINK_TMP_DATA_DIR;
    private HashMap<Integer, Object> sets = new HashMap();
    public ExecutionEnvironment env;
    private PythonPlanStreamer streamer;
    public static final int MAPPED_FILE_SIZE = 0x4000000;

    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            System.out.println("Usage: ./bin/pyflink<2/3>.[sh/bat] <pathToScript>[ <pathToPackage1>[ <pathToPackageX]][ - <parameter1>[ <parameterX>]]");
            return;
        }
        usePython3 = args[0].equals(ARGUMENT_PYTHON_3);
        PythonPlanBinder binder = new PythonPlanBinder();
        binder.runPlan(Arrays.copyOfRange(args, 1, args.length));
    }

    public PythonPlanBinder() throws IOException {
        FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString((String)FLINK_PYTHON2_BINARY_KEY, (String)"python");
        FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString((String)FLINK_PYTHON3_BINARY_KEY, (String)"python3");
        FULL_PATH = FLINK_DIR != null ? FLINK_DIR + FLINK_PYTHON_REL_LOCAL_PATH : new Path(FileSystem.getLocalFileSystem().getWorkingDirectory(), "src/main/python/org/apache/flink/python/api").toString();
    }

    private void runPlan(String[] args) throws Exception {
        this.env = ExecutionEnvironment.getExecutionEnvironment();
        int split = 0;
        for (int x = 0; x < args.length; ++x) {
            if (args[x].compareTo("-") != 0) continue;
            split = x;
        }
        try {
            String tmpPath = FLINK_PYTHON_FILE_PATH + r.nextInt();
            this.prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split == 0 ? args.length : split));
            this.startPython(tmpPath, Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length));
            this.receivePlan();
            if (this.env instanceof LocalEnvironment) {
                FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + FLINK_PYTHON_DC_ID;
            }
            PythonPlanBinder.distributeFiles(tmpPath, this.env);
            JobExecutionResult jer = this.env.execute();
            this.sendResult(jer);
            this.close();
        }
        catch (Exception e) {
            this.close();
            throw e;
        }
    }

    private void prepareFiles(String tempFilePath, String ... filePaths) throws IOException, URISyntaxException {
        PythonPlanBinder.clearPath(tempFilePath);
        FileCache.copy((Path)new Path(FULL_PATH), (Path)new Path(tempFilePath), (boolean)false);
        PythonPlanBinder.copyFile(filePaths[0], tempFilePath, FLINK_PYTHON_PLAN_NAME);
        for (int x = 1; x < filePaths.length; ++x) {
            PythonPlanBinder.copyFile(filePaths[x], tempFilePath, null);
        }
    }

    private static void clearPath(String path) throws IOException, URISyntaxException {
        FileSystem fs = FileSystem.get((URI)new Path(path).toUri());
        if (fs.exists(new Path(path))) {
            fs.delete(new Path(path), true);
        }
    }

    private static void copyFile(String path, String target, String name) throws IOException, URISyntaxException {
        if (path.endsWith("/")) {
            path = path.substring(0, path.length() - 1);
        }
        String identifier = name == null ? path.substring(path.lastIndexOf("/")) : name;
        String tmpFilePath = target + "/" + identifier;
        PythonPlanBinder.clearPath(tmpFilePath);
        Path p = new Path(path);
        FileCache.copy((Path)p.makeQualified(FileSystem.get((URI)p.toUri())), (Path)new Path(tmpFilePath), (boolean)true);
    }

    private static void distributeFiles(String tmpPath, ExecutionEnvironment env) throws IOException, URISyntaxException {
        PythonPlanBinder.clearPath(FLINK_HDFS_PATH);
        FileCache.copy((Path)new Path(tmpPath), (Path)new Path(FLINK_HDFS_PATH), (boolean)true);
        env.registerCachedFile(FLINK_HDFS_PATH, FLINK_PYTHON_DC_ID);
        PythonPlanBinder.clearPath(tmpPath);
    }

    private void startPython(String tempPath, String[] args) throws IOException {
        for (String arg : args) {
            arguments.append(" ").append(arg);
        }
        this.streamer = new PythonPlanStreamer();
        this.streamer.open(tempPath, arguments.toString());
    }

    private void sendResult(JobExecutionResult jer) throws IOException {
        long runtime = jer.getNetRuntime();
        this.streamer.sendRecord(runtime);
    }

    private void close() {
        try {
            FileSystem hdfs = FileSystem.get((URI)new URI(FLINK_HDFS_PATH));
            hdfs.delete(new Path(FLINK_HDFS_PATH), true);
            FileSystem local = FileSystem.getLocalFileSystem();
            local.delete(new Path(FLINK_PYTHON_FILE_PATH), true);
            local.delete(new Path(FLINK_TMP_DATA_DIR), true);
            this.streamer.close();
        }
        catch (NullPointerException hdfs) {
        }
        catch (IOException ioe) {
            LOG.error("PythonAPI file cleanup failed. " + ioe.getMessage());
        }
        catch (URISyntaxException uRISyntaxException) {
            // empty catch block
        }
    }

    private void receivePlan() throws IOException {
        this.receiveParameters();
        this.receiveOperations();
    }

    private void receiveParameters() throws IOException {
        block5: for (int x = 0; x < 3; ++x) {
            Tuple value = (Tuple)this.streamer.getRecord(true);
            switch (Parameters.valueOf(((String)value.getField(0)).toUpperCase())) {
                case DOP: {
                    Integer dop = (Integer)value.getField(1);
                    this.env.setParallelism(dop.intValue());
                    continue block5;
                }
                case MODE: {
                    FLINK_HDFS_PATH = (Boolean)value.getField(1) != false ? "file:/tmp/flink" : "hdfs:/tmp/flink";
                    continue block5;
                }
                case RETRY: {
                    int retry = (Integer)value.getField(1);
                    this.env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)retry, (long)10000L));
                }
            }
        }
        if (this.env.getParallelism() < 0) {
            this.env.setParallelism(1);
        }
    }

    private void receiveOperations() throws IOException {
        Integer operationCount = (Integer)this.streamer.getRecord(true);
        block32: for (int x = 0; x < operationCount; ++x) {
            Operation op;
            PythonOperationInfo info = new PythonOperationInfo(this.streamer);
            try {
                op = Operation.valueOf(info.identifier.toUpperCase());
            }
            catch (IllegalArgumentException iae) {
                throw new IllegalArgumentException("Invalid operation specified: " + info.identifier);
            }
            switch (op) {
                case SOURCE_CSV: {
                    this.createCsvSource(info);
                    continue block32;
                }
                case SOURCE_TEXT: {
                    this.createTextSource(info);
                    continue block32;
                }
                case SOURCE_VALUE: {
                    this.createValueSource(info);
                    continue block32;
                }
                case SOURCE_SEQ: {
                    this.createSequenceSource(info);
                    continue block32;
                }
                case SINK_CSV: {
                    this.createCsvSink(info);
                    continue block32;
                }
                case SINK_TEXT: {
                    this.createTextSink(info);
                    continue block32;
                }
                case SINK_PRINT: {
                    this.createPrintSink(info);
                    continue block32;
                }
                case BROADCAST: {
                    this.createBroadcastVariable(info);
                    continue block32;
                }
                case DISTINCT: {
                    this.createDistinctOperation(info);
                    continue block32;
                }
                case FIRST: {
                    this.createFirstOperation(info);
                    continue block32;
                }
                case PARTITION_HASH: {
                    this.createHashPartitionOperation(info);
                    continue block32;
                }
                case REBALANCE: {
                    this.createRebalanceOperation(info);
                    continue block32;
                }
                case GROUPBY: {
                    this.createGroupOperation(info);
                    continue block32;
                }
                case SORT: {
                    this.createSortOperation(info);
                    continue block32;
                }
                case UNION: {
                    this.createUnionOperation(info);
                    continue block32;
                }
                case COGROUP: {
                    this.createCoGroupOperation(info);
                    continue block32;
                }
                case CROSS: {
                    this.createCrossOperation(PythonOperationInfo.DatasizeHint.NONE, info);
                    continue block32;
                }
                case CROSS_H: {
                    this.createCrossOperation(PythonOperationInfo.DatasizeHint.HUGE, info);
                    continue block32;
                }
                case CROSS_T: {
                    this.createCrossOperation(PythonOperationInfo.DatasizeHint.TINY, info);
                    continue block32;
                }
                case FILTER: {
                    this.createFilterOperation(info);
                    continue block32;
                }
                case FLATMAP: {
                    this.createFlatMapOperation(info);
                    continue block32;
                }
                case GROUPREDUCE: {
                    this.createGroupReduceOperation(info);
                    continue block32;
                }
                case JOIN: {
                    this.createJoinOperation(PythonOperationInfo.DatasizeHint.NONE, info);
                    continue block32;
                }
                case JOIN_H: {
                    this.createJoinOperation(PythonOperationInfo.DatasizeHint.HUGE, info);
                    continue block32;
                }
                case JOIN_T: {
                    this.createJoinOperation(PythonOperationInfo.DatasizeHint.TINY, info);
                    continue block32;
                }
                case MAP: {
                    this.createMapOperation(info);
                    continue block32;
                }
                case MAPPARTITION: {
                    this.createMapPartitionOperation(info);
                    continue block32;
                }
                case REDUCE: {
                    this.createReduceOperation(info);
                }
            }
        }
    }

    private int getParallelism(PythonOperationInfo info) {
        return info.parallelism == -1 ? this.env.getParallelism() : info.parallelism;
    }

    private void createCsvSource(PythonOperationInfo info) throws IOException {
        if (!(info.types instanceof TupleTypeInfo)) {
            throw new RuntimeException("The output type of a csv source has to be a tuple. The derived type is " + info);
        }
        Path path = new Path(info.path);
        String lineD = info.lineDelimiter;
        String fieldD = info.fieldDelimiter;
        TupleTypeInfo types = (TupleTypeInfo)info.types;
        this.sets.put(info.setID, this.env.createInput((InputFormat)new TupleCsvInputFormat(path, lineD, fieldD, (TupleTypeInfoBase)types), info.types).setParallelism(this.getParallelism(info)).name("CsvSource").map(new SerializerMap()).setParallelism(this.getParallelism(info)).name("CsvSourcePostStep"));
    }

    private void createTextSource(PythonOperationInfo info) throws IOException {
        this.sets.put(info.setID, ((MapOperator)((DataSource)((DataSource)this.env.readTextFile(info.path).setParallelism(this.getParallelism(info))).name("TextSource")).map(new SerializerMap()).setParallelism(this.getParallelism(info))).name("TextSourcePostStep"));
    }

    private void createValueSource(PythonOperationInfo info) throws IOException {
        this.sets.put(info.setID, ((MapOperator)((DataSource)((DataSource)this.env.fromElements(info.values).setParallelism(this.getParallelism(info))).name("ValueSource")).map(new SerializerMap()).setParallelism(this.getParallelism(info))).name("ValueSourcePostStep"));
    }

    private void createSequenceSource(PythonOperationInfo info) throws IOException {
        this.sets.put(info.setID, ((MapOperator)((DataSource)((DataSource)this.env.generateSequence(info.frm, info.to).setParallelism(this.getParallelism(info))).name("SequenceSource")).map(new SerializerMap()).setParallelism(this.getParallelism(info))).name("SequenceSourcePostStep"));
    }

    private void createCsvSink(PythonOperationInfo info) throws IOException {
        DataSet parent = (DataSet)this.sets.get(info.parentID);
        parent.map((MapFunction)new StringTupleDeserializerMap()).setParallelism(this.getParallelism(info)).name("CsvSinkPreStep").writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).setParallelism(this.getParallelism(info)).name("CsvSink");
    }

    private void createTextSink(PythonOperationInfo info) throws IOException {
        DataSet parent = (DataSet)this.sets.get(info.parentID);
        parent.map((MapFunction)new StringDeserializerMap()).setParallelism(this.getParallelism(info)).writeAsText(info.path, info.writeMode).setParallelism(this.getParallelism(info)).name("TextSink");
    }

    private void createPrintSink(PythonOperationInfo info) throws IOException {
        DataSet parent = (DataSet)this.sets.get(info.parentID);
        parent.map((MapFunction)new StringDeserializerMap()).setParallelism(this.getParallelism(info)).name("PrintSinkPreStep").output((OutputFormat)new PrintingOutputFormat(info.toError)).setParallelism(this.getParallelism(info));
    }

    private void createBroadcastVariable(PythonOperationInfo info) throws IOException {
        UdfOperator op1 = (UdfOperator)this.sets.get(info.parentID);
        DataSet op2 = (DataSet)this.sets.get(info.otherID);
        op1.withBroadcastSet(op2, info.name);
        Configuration c = op1.getParameters();
        if (c == null) {
            c = new Configuration();
        }
        int count = c.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0);
        c.setInteger(PLANBINDER_CONFIG_BCVAR_COUNT, count + 1);
        c.setString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + count, info.name);
        op1.withParameters(c);
    }

    private void createDistinctOperation(PythonOperationInfo info) throws IOException {
        DataSet op = (DataSet)this.sets.get(info.parentID);
        this.sets.put(info.setID, op.distinct(info.keys).setParallelism(this.getParallelism(info)).name("Distinct").map(new KeyDiscarder()).setParallelism(this.getParallelism(info)).name("DistinctPostStep"));
    }

    private void createFirstOperation(PythonOperationInfo info) throws IOException {
        DataSet op = (DataSet)this.sets.get(info.parentID);
        this.sets.put(info.setID, op.first(info.count).setParallelism(this.getParallelism(info)).name("First"));
    }

    private void createGroupOperation(PythonOperationInfo info) throws IOException {
        DataSet op1 = (DataSet)this.sets.get(info.parentID);
        this.sets.put(info.setID, op1.groupBy(info.keys));
    }

    private void createHashPartitionOperation(PythonOperationInfo info) throws IOException {
        DataSet op1 = (DataSet)this.sets.get(info.parentID);
        this.sets.put(info.setID, op1.partitionByHash(info.keys).setParallelism(this.getParallelism(info)).map(new KeyDiscarder()).setParallelism(this.getParallelism(info)).name("HashPartitionPostStep"));
    }

    private void createRebalanceOperation(PythonOperationInfo info) throws IOException {
        DataSet op = (DataSet)this.sets.get(info.parentID);
        this.sets.put(info.setID, op.rebalance().setParallelism(this.getParallelism(info)).name("Rebalance"));
    }

    private void createSortOperation(PythonOperationInfo info) throws IOException {
        Grouping op1 = (Grouping)this.sets.get(info.parentID);
        if (op1 instanceof UnsortedGrouping) {
            this.sets.put(info.setID, ((UnsortedGrouping)op1).sortGroup(info.field, info.order));
            return;
        }
        if (op1 instanceof SortedGrouping) {
            this.sets.put(info.setID, ((SortedGrouping)op1).sortGroup(info.field, info.order));
        }
    }

    private void createUnionOperation(PythonOperationInfo info) throws IOException {
        DataSet op1 = (DataSet)this.sets.get(info.parentID);
        DataSet op2 = (DataSet)this.sets.get(info.otherID);
        this.sets.put(info.setID, op1.union(op2).setParallelism(this.getParallelism(info)).name("Union"));
    }

    private void createCoGroupOperation(PythonOperationInfo info) {
        DataSet op1 = (DataSet)this.sets.get(info.parentID);
        DataSet op2 = (DataSet)this.sets.get(info.otherID);
        Keys.ExpressionKeys key1 = new Keys.ExpressionKeys(info.keys1, op1.getType());
        Keys.ExpressionKeys key2 = new Keys.ExpressionKeys(info.keys2, op2.getType());
        PythonCoGroup pcg = new PythonCoGroup(info.setID, info.types);
        this.sets.put(info.setID, new CoGroupRawOperator(op1, op2, (Keys)key1, (Keys)key2, pcg, info.types, info.name).setParallelism(this.getParallelism(info)));
    }

    private void createCrossOperation(PythonOperationInfo.DatasizeHint mode, PythonOperationInfo info) {
        CrossOperator.DefaultCross defaultResult;
        DataSet op1 = (DataSet)this.sets.get(info.parentID);
        DataSet op2 = (DataSet)this.sets.get(info.otherID);
        switch (mode) {
            case NONE: {
                defaultResult = op1.cross(op2);
                break;
            }
            case HUGE: {
                defaultResult = op1.crossWithHuge(op2);
                break;
            }
            case TINY: {
                defaultResult = op1.crossWithTiny(op2);
                break;
            }
            default: {
                throw new IllegalArgumentException("Invalid Cross mode specified: " + (Object)((Object)mode));
            }
        }
        defaultResult.setParallelism(this.getParallelism(info));
        if (info.usesUDF) {
            this.sets.put(info.setID, defaultResult.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(this.getParallelism(info)).name(info.name));
        } else {
            this.sets.put(info.setID, defaultResult.name("DefaultCross"));
        }
    }

    private void createFilterOperation(PythonOperationInfo info) {
        DataSet op1 = (DataSet)this.sets.get(info.parentID);
        this.sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(this.getParallelism(info)).name(info.name));
    }

    private void createFlatMapOperation(PythonOperationInfo info) {
        DataSet op1 = (DataSet)this.sets.get(info.parentID);
        this.sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(this.getParallelism(info)).name(info.name));
    }

    private void createGroupReduceOperation(PythonOperationInfo info) {
        Object op1 = this.sets.get(info.parentID);
        if (op1 instanceof DataSet) {
            this.sets.put(info.setID, this.applyGroupReduceOperation((DataSet)op1, info));
            return;
        }
        if (op1 instanceof UnsortedGrouping) {
            this.sets.put(info.setID, this.applyGroupReduceOperation((UnsortedGrouping)op1, info));
            return;
        }
        if (op1 instanceof SortedGrouping) {
            this.sets.put(info.setID, this.applyGroupReduceOperation((SortedGrouping)op1, info));
        }
    }

    private DataSet applyGroupReduceOperation(DataSet op1, PythonOperationInfo info) {
        return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).name("PythonGroupReducePreStep").setParallelism(this.getParallelism(info)).mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(this.getParallelism(info)).name(info.name);
    }

    private DataSet applyGroupReduceOperation(UnsortedGrouping op1, PythonOperationInfo info) {
        return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(this.getParallelism(info)).name("PythonGroupReducePreStep").mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(this.getParallelism(info)).name(info.name);
    }

    private DataSet applyGroupReduceOperation(SortedGrouping op1, PythonOperationInfo info) {
        return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(this.getParallelism(info)).name("PythonGroupReducePreStep").mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(this.getParallelism(info)).name(info.name);
    }

    private void createJoinOperation(PythonOperationInfo.DatasizeHint mode, PythonOperationInfo info) {
        DataSet op1 = (DataSet)this.sets.get(info.parentID);
        DataSet op2 = (DataSet)this.sets.get(info.otherID);
        if (info.usesUDF) {
            this.sets.put(info.setID, this.createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, this.getParallelism(info)).mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(this.getParallelism(info)).name(info.name));
        } else {
            this.sets.put(info.setID, this.createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, this.getParallelism(info)));
        }
    }

    private DataSet createDefaultJoin(DataSet op1, DataSet op2, String[] firstKeys, String[] secondKeys, PythonOperationInfo.DatasizeHint mode, int parallelism) {
        switch (mode) {
            case NONE: {
                return op1.join(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism).map(new NestedKeyDiscarder()).setParallelism(parallelism).name("DefaultJoinPostStep");
            }
            case HUGE: {
                return op1.joinWithHuge(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism).map(new NestedKeyDiscarder()).setParallelism(parallelism).name("DefaultJoinPostStep");
            }
            case TINY: {
                return op1.joinWithTiny(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism).map(new NestedKeyDiscarder()).setParallelism(parallelism).name("DefaultJoinPostStep");
            }
        }
        throw new IllegalArgumentException("Invalid join mode specified.");
    }

    private void createMapOperation(PythonOperationInfo info) {
        DataSet op1 = (DataSet)this.sets.get(info.parentID);
        this.sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(this.getParallelism(info)).name(info.name));
    }

    private void createMapPartitionOperation(PythonOperationInfo info) {
        DataSet op1 = (DataSet)this.sets.get(info.parentID);
        this.sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(this.getParallelism(info)).name(info.name));
    }

    private void createReduceOperation(PythonOperationInfo info) {
        Object op1 = this.sets.get(info.parentID);
        if (op1 instanceof DataSet) {
            this.sets.put(info.setID, this.applyReduceOperation((DataSet)op1, info));
            return;
        }
        if (op1 instanceof UnsortedGrouping) {
            this.sets.put(info.setID, this.applyReduceOperation((UnsortedGrouping)op1, info));
        }
    }

    private DataSet applyReduceOperation(DataSet op1, PythonOperationInfo info) {
        return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(this.getParallelism(info)).name("PythonReducePreStep").mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(this.getParallelism(info)).name(info.name);
    }

    private DataSet applyReduceOperation(UnsortedGrouping op1, PythonOperationInfo info) {
        return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(this.getParallelism(info)).name("PythonReducePreStep").mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(this.getParallelism(info)).name(info.name);
    }

    static {
        arguments = new StringBuilder();
        usePython3 = false;
        FLINK_HDFS_PATH = "hdfs:/tmp";
        FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + File.separator + "flink_data";
    }

    protected static enum Operation {
        SOURCE_CSV,
        SOURCE_TEXT,
        SOURCE_VALUE,
        SOURCE_SEQ,
        SINK_CSV,
        SINK_TEXT,
        SINK_PRINT,
        SORT,
        UNION,
        FIRST,
        DISTINCT,
        GROUPBY,
        REBALANCE,
        PARTITION_HASH,
        BROADCAST,
        COGROUP,
        CROSS,
        CROSS_H,
        CROSS_T,
        FILTER,
        FLATMAP,
        GROUPREDUCE,
        JOIN,
        JOIN_H,
        JOIN_T,
        MAP,
        REDUCE,
        MAPPARTITION;

    }

    private static enum Parameters {
        DOP,
        MODE,
        RETRY;

    }
}

