package org.apache.flink.python.api;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Random;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.io.PrintingOutputFormat;
import org.apache.flink.api.java.io.TupleCsvInputFormat;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.CoGroupRawOperator;
import org.apache.flink.api.java.operators.CrossOperator;
import org.apache.flink.api.java.operators.Grouping;
import org.apache.flink.api.java.operators.SortedGrouping;
import org.apache.flink.api.java.operators.UdfOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.python.api.PythonOperationInfo;
import org.apache.flink.python.api.functions.PythonCoGroup;
import org.apache.flink.python.api.functions.PythonMapPartition;
import org.apache.flink.python.api.functions.util.IdentityGroupReduce;
import org.apache.flink.python.api.functions.util.KeyDiscarder;
import org.apache.flink.python.api.functions.util.NestedKeyDiscarder;
import org.apache.flink.python.api.functions.util.SerializerMap;
import org.apache.flink.python.api.functions.util.StringDeserializerMap;
import org.apache.flink.python.api.functions.util.StringTupleDeserializerMap;
import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
import org.apache.flink.python.api.streaming.util.SerializationUtils;
import org.apache.flink.runtime.filecache.FileCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/python/api/PythonPlanBinder.class */
public class PythonPlanBinder {
    public static final String ARGUMENT_PYTHON_2 = "2";
    public static final String ARGUMENT_PYTHON_3 = "3";
    public static final String FLINK_PYTHON_DC_ID = "flink";
    public static final String PLANBINDER_CONFIG_BCVAR_COUNT = "PLANBINDER_BCVAR_COUNT";
    public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_";
    private static String FULL_PATH;
    private HashMap<Integer, Object> sets = new HashMap<>();
    public ExecutionEnvironment env;
    private PythonPlanStreamer streamer;
    public static final int MAPPED_FILE_SIZE = 67108864;
    static final Logger LOG = LoggerFactory.getLogger(PythonPlanBinder.class);
    public static final String FLINK_PYTHON_PLAN_NAME = File.separator + "plan.py";
    public static final String FLINK_PYTHON2_BINARY_KEY = "python.binary.python2";
    public static String FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
    public static final String FLINK_PYTHON3_BINARY_KEY = "python.binary.python3";
    public static String FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
    private static final Random r = new Random();
    public static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
    private static final String FLINK_PYTHON_REL_LOCAL_PATH = File.separator + "resources" + File.separator + "python";
    private static final String FLINK_DIR = System.getenv("FLINK_ROOT_DIR");
    public static StringBuilder arguments = new StringBuilder();
    public static boolean usePython3 = false;
    private static String FLINK_HDFS_PATH = "hdfs:/tmp";
    public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + File.separator + "flink_data";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.python.api.PythonPlanBinder$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/python/api/PythonPlanBinder$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation;

        static {
            try {
                $SwitchMap$org$apache$flink$python$api$PythonOperationInfo$DatasizeHint[PythonOperationInfo.DatasizeHint.NONE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonOperationInfo$DatasizeHint[PythonOperationInfo.DatasizeHint.HUGE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonOperationInfo$DatasizeHint[PythonOperationInfo.DatasizeHint.TINY.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation = new int[Operation.values().length];
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.SOURCE_CSV.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.SOURCE_TEXT.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.SOURCE_VALUE.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.SOURCE_SEQ.ordinal()] = 4;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.SINK_CSV.ordinal()] = 5;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.SINK_TEXT.ordinal()] = 6;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.SINK_PRINT.ordinal()] = 7;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.BROADCAST.ordinal()] = 8;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.AGGREGATE.ordinal()] = 9;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.DISTINCT.ordinal()] = 10;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.FIRST.ordinal()] = 11;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.PARTITION_HASH.ordinal()] = 12;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.REBALANCE.ordinal()] = 13;
            } catch (NoSuchFieldError e16) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.GROUPBY.ordinal()] = 14;
            } catch (NoSuchFieldError e17) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.SORT.ordinal()] = 15;
            } catch (NoSuchFieldError e18) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.UNION.ordinal()] = 16;
            } catch (NoSuchFieldError e19) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.COGROUP.ordinal()] = 17;
            } catch (NoSuchFieldError e20) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.CROSS.ordinal()] = 18;
            } catch (NoSuchFieldError e21) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.CROSS_H.ordinal()] = 19;
            } catch (NoSuchFieldError e22) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.CROSS_T.ordinal()] = 20;
            } catch (NoSuchFieldError e23) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.FILTER.ordinal()] = 21;
            } catch (NoSuchFieldError e24) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.FLATMAP.ordinal()] = 22;
            } catch (NoSuchFieldError e25) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.GROUPREDUCE.ordinal()] = 23;
            } catch (NoSuchFieldError e26) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.JOIN.ordinal()] = 24;
            } catch (NoSuchFieldError e27) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.JOIN_H.ordinal()] = 25;
            } catch (NoSuchFieldError e28) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.JOIN_T.ordinal()] = 26;
            } catch (NoSuchFieldError e29) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.MAP.ordinal()] = 27;
            } catch (NoSuchFieldError e30) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.MAPPARTITION.ordinal()] = 28;
            } catch (NoSuchFieldError e31) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.REDUCE.ordinal()] = 29;
            } catch (NoSuchFieldError e32) {
            }
            $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Parameters = new int[Parameters.values().length];
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Parameters[Parameters.DOP.ordinal()] = 1;
            } catch (NoSuchFieldError e33) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Parameters[Parameters.MODE.ordinal()] = 2;
            } catch (NoSuchFieldError e34) {
            }
            try {
                $SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Parameters[Parameters.RETRY.ordinal()] = 3;
            } catch (NoSuchFieldError e35) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/python/api/PythonPlanBinder$Operation.class */
    public enum Operation {
        SOURCE_CSV,
        SOURCE_TEXT,
        SOURCE_VALUE,
        SOURCE_SEQ,
        SINK_CSV,
        SINK_TEXT,
        SINK_PRINT,
        SORT,
        UNION,
        FIRST,
        DISTINCT,
        GROUPBY,
        AGGREGATE,
        REBALANCE,
        PARTITION_HASH,
        BROADCAST,
        COGROUP,
        CROSS,
        CROSS_H,
        CROSS_T,
        FILTER,
        FLATMAP,
        GROUPREDUCE,
        JOIN,
        JOIN_H,
        JOIN_T,
        MAP,
        REDUCE,
        MAPPARTITION
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/python/api/PythonPlanBinder$Parameters.class */
    public enum Parameters {
        DOP,
        MODE,
        RETRY
    }

    public static void main(String[] strArr) throws Exception {
        if (strArr.length < 2) {
            System.out.println("Usage: ./bin/pyflink<2/3>.[sh/bat] <pathToScript>[ <pathToPackage1>[ <pathToPackageX]][ - <parameter1>[ <parameterX>]]");
        } else {
            usePython3 = strArr[0].equals(ARGUMENT_PYTHON_3);
            new PythonPlanBinder().runPlan((String[]) Arrays.copyOfRange(strArr, 1, strArr.length));
        }
    }

    public PythonPlanBinder() throws IOException {
        FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
        FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
        FULL_PATH = FLINK_DIR != null ? FLINK_DIR + FLINK_PYTHON_REL_LOCAL_PATH : new Path(FileSystem.getLocalFileSystem().getWorkingDirectory(), "src/main/python/org/apache/flink/python/api").toString();
    }

    private void runPlan(String[] strArr) throws Exception {
        this.env = ExecutionEnvironment.getExecutionEnvironment();
        int i = 0;
        for (int i2 = 0; i2 < strArr.length; i2++) {
            if (strArr[i2].compareTo("-") == 0) {
                i = i2;
            }
        }
        try {
            String str = FLINK_PYTHON_FILE_PATH + r.nextInt();
            prepareFiles(str, (String[]) Arrays.copyOfRange(strArr, 0, i == 0 ? 1 : i));
            startPython(str, (String[]) Arrays.copyOfRange(strArr, i == 0 ? strArr.length : i + 1, strArr.length));
            receivePlan();
            if (this.env instanceof LocalEnvironment) {
                FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + FLINK_PYTHON_DC_ID;
            }
            distributeFiles(str, this.env);
            sendResult(this.env.execute());
            close();
        } catch (Exception e) {
            close();
            throw e;
        }
    }

    private void prepareFiles(String str, String... strArr) throws IOException, URISyntaxException {
        clearPath(str);
        FileCache.copy(new Path(FULL_PATH), new Path(str), false);
        copyFile(strArr[0], str, FLINK_PYTHON_PLAN_NAME);
        for (int i = 1; i < strArr.length; i++) {
            copyFile(strArr[i], str, null);
        }
    }

    private static void clearPath(String str) throws IOException, URISyntaxException {
        FileSystem fileSystem = FileSystem.get(new Path(str).toUri());
        if (fileSystem.exists(new Path(str))) {
            fileSystem.delete(new Path(str), true);
        }
    }

    private static void copyFile(String str, String str2, String str3) throws IOException, URISyntaxException {
        if (str.endsWith("/")) {
            str = str.substring(0, str.length() - 1);
        }
        String str4 = str2 + "/" + (str3 == null ? str.substring(str.lastIndexOf("/")) : str3);
        clearPath(str4);
        Path path = new Path(str);
        FileCache.copy(path.makeQualified(FileSystem.get(path.toUri())), new Path(str4), true);
    }

    private static void distributeFiles(String str, ExecutionEnvironment executionEnvironment) throws IOException, URISyntaxException {
        clearPath(FLINK_HDFS_PATH);
        FileCache.copy(new Path(str), new Path(FLINK_HDFS_PATH), true);
        executionEnvironment.registerCachedFile(FLINK_HDFS_PATH, FLINK_PYTHON_DC_ID);
        clearPath(str);
    }

    private void startPython(String str, String[] strArr) throws IOException {
        for (String str2 : strArr) {
            arguments.append(" ").append(str2);
        }
        this.streamer = new PythonPlanStreamer();
        this.streamer.open(str, arguments.toString());
    }

    private void sendResult(JobExecutionResult jobExecutionResult) throws IOException {
        this.streamer.sendRecord(Long.valueOf(jobExecutionResult.getNetRuntime()));
    }

    private void close() {
        try {
            FileSystem.get(new URI(FLINK_HDFS_PATH)).delete(new Path(FLINK_HDFS_PATH), true);
            FileSystem localFileSystem = FileSystem.getLocalFileSystem();
            localFileSystem.delete(new Path(FLINK_PYTHON_FILE_PATH), true);
            localFileSystem.delete(new Path(FLINK_TMP_DATA_DIR), true);
            this.streamer.close();
        } catch (IOException e) {
            LOG.error("PythonAPI file cleanup failed. " + e.getMessage());
        } catch (NullPointerException e2) {
        } catch (URISyntaxException e3) {
        }
    }

    private void receivePlan() throws IOException {
        receiveParameters();
        receiveOperations();
    }

    private void receiveParameters() throws IOException {
        for (int i = 0; i < 3; i++) {
            Tuple tuple = (Tuple) this.streamer.getRecord(true);
            switch (Parameters.valueOf(((String) tuple.getField(0)).toUpperCase())) {
                case DOP:
                    this.env.setParallelism(((Integer) tuple.getField(1)).intValue());
                    break;
                case MODE:
                    FLINK_HDFS_PATH = ((Boolean) tuple.getField(1)).booleanValue() ? "file:/tmp/flink" : "hdfs:/tmp/flink";
                    break;
                case RETRY:
                    this.env.setRestartStrategy(RestartStrategies.fixedDelayRestart(((Integer) tuple.getField(1)).intValue(), 10000L));
                    break;
            }
        }
        if (this.env.getParallelism() < 0) {
            this.env.setParallelism(1);
        }
    }

    private void receiveOperations() throws IOException {
        Integer num = (Integer) this.streamer.getRecord(true);
        for (int i = 0; i < num.intValue(); i++) {
            PythonOperationInfo pythonOperationInfo = new PythonOperationInfo(this.streamer);
            try {
                switch (AnonymousClass1.$SwitchMap$org$apache$flink$python$api$PythonPlanBinder$Operation[Operation.valueOf(pythonOperationInfo.identifier.toUpperCase()).ordinal()]) {
                    case 1:
                        createCsvSource(pythonOperationInfo);
                        break;
                    case 2:
                        createTextSource(pythonOperationInfo);
                        break;
                    case 3:
                        createValueSource(pythonOperationInfo);
                        break;
                    case 4:
                        createSequenceSource(pythonOperationInfo);
                        break;
                    case 5:
                        createCsvSink(pythonOperationInfo);
                        break;
                    case 6:
                        createTextSink(pythonOperationInfo);
                        break;
                    case 7:
                        createPrintSink(pythonOperationInfo);
                        break;
                    case 8:
                        createBroadcastVariable(pythonOperationInfo);
                        break;
                    case 9:
                        createAggregationOperation(pythonOperationInfo);
                        break;
                    case 10:
                        createDistinctOperation(pythonOperationInfo);
                        break;
                    case 11:
                        createFirstOperation(pythonOperationInfo);
                        break;
                    case 12:
                        createHashPartitionOperation(pythonOperationInfo);
                        break;
                    case 13:
                        createRebalanceOperation(pythonOperationInfo);
                        break;
                    case 14:
                        createGroupOperation(pythonOperationInfo);
                        break;
                    case 15:
                        createSortOperation(pythonOperationInfo);
                        break;
                    case 16:
                        createUnionOperation(pythonOperationInfo);
                        break;
                    case 17:
                        createCoGroupOperation(pythonOperationInfo);
                        break;
                    case 18:
                        createCrossOperation(PythonOperationInfo.DatasizeHint.NONE, pythonOperationInfo);
                        break;
                    case 19:
                        createCrossOperation(PythonOperationInfo.DatasizeHint.HUGE, pythonOperationInfo);
                        break;
                    case 20:
                        createCrossOperation(PythonOperationInfo.DatasizeHint.TINY, pythonOperationInfo);
                        break;
                    case 21:
                        createFilterOperation(pythonOperationInfo);
                        break;
                    case 22:
                        createFlatMapOperation(pythonOperationInfo);
                        break;
                    case 23:
                        createGroupReduceOperation(pythonOperationInfo);
                        break;
                    case 24:
                        createJoinOperation(PythonOperationInfo.DatasizeHint.NONE, pythonOperationInfo);
                        break;
                    case 25:
                        createJoinOperation(PythonOperationInfo.DatasizeHint.HUGE, pythonOperationInfo);
                        break;
                    case SerializationUtils.TYPE_NULL /* 26 */:
                        createJoinOperation(PythonOperationInfo.DatasizeHint.TINY, pythonOperationInfo);
                        break;
                    case SerializationUtils.TYPE_BYTES /* 27 */:
                        createMapOperation(pythonOperationInfo);
                        break;
                    case SerializationUtils.TYPE_STRING /* 28 */:
                        createMapPartitionOperation(pythonOperationInfo);
                        break;
                    case SerializationUtils.TYPE_FLOAT /* 29 */:
                        createReduceOperation(pythonOperationInfo);
                        break;
                }
            } catch (IllegalArgumentException e) {
                throw new IllegalArgumentException("Invalid operation specified: " + pythonOperationInfo.identifier);
            }
        }
    }

    private int getParallelism(PythonOperationInfo pythonOperationInfo) {
        return pythonOperationInfo.parallelism == -1 ? this.env.getParallelism() : pythonOperationInfo.parallelism;
    }

    private void createCsvSource(PythonOperationInfo pythonOperationInfo) throws IOException {
        if (!(pythonOperationInfo.types instanceof TupleTypeInfo)) {
            throw new RuntimeException("The output type of a csv source has to be a tuple. The derived type is " + pythonOperationInfo);
        }
        this.sets.put(Integer.valueOf(pythonOperationInfo.setID), this.env.createInput(new TupleCsvInputFormat(new Path(pythonOperationInfo.path), pythonOperationInfo.lineDelimiter, pythonOperationInfo.fieldDelimiter, pythonOperationInfo.types), pythonOperationInfo.types).setParallelism(getParallelism(pythonOperationInfo)).name("CsvSource").map(new SerializerMap()).setParallelism(getParallelism(pythonOperationInfo)).name("CsvSourcePostStep"));
    }

    private void createTextSource(PythonOperationInfo pythonOperationInfo) throws IOException {
        this.sets.put(Integer.valueOf(pythonOperationInfo.setID), this.env.readTextFile(pythonOperationInfo.path).setParallelism(getParallelism(pythonOperationInfo)).name("TextSource").map(new SerializerMap()).setParallelism(getParallelism(pythonOperationInfo)).name("TextSourcePostStep"));
    }

    private void createValueSource(PythonOperationInfo pythonOperationInfo) throws IOException {
        this.sets.put(Integer.valueOf(pythonOperationInfo.setID), this.env.fromElements(pythonOperationInfo.values).setParallelism(getParallelism(pythonOperationInfo)).name("ValueSource").map(new SerializerMap()).setParallelism(getParallelism(pythonOperationInfo)).name("ValueSourcePostStep"));
    }

    private void createSequenceSource(PythonOperationInfo pythonOperationInfo) throws IOException {
        this.sets.put(Integer.valueOf(pythonOperationInfo.setID), this.env.generateSequence(pythonOperationInfo.from, pythonOperationInfo.to).setParallelism(getParallelism(pythonOperationInfo)).name("SequenceSource").map(new SerializerMap()).setParallelism(getParallelism(pythonOperationInfo)).name("SequenceSourcePostStep"));
    }

    private void createCsvSink(PythonOperationInfo pythonOperationInfo) throws IOException {
        ((DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.parentID))).map(new StringTupleDeserializerMap()).setParallelism(getParallelism(pythonOperationInfo)).name("CsvSinkPreStep").writeAsCsv(pythonOperationInfo.path, pythonOperationInfo.lineDelimiter, pythonOperationInfo.fieldDelimiter, pythonOperationInfo.writeMode).setParallelism(getParallelism(pythonOperationInfo)).name("CsvSink");
    }

    private void createTextSink(PythonOperationInfo pythonOperationInfo) throws IOException {
        ((DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.parentID))).map(new StringDeserializerMap()).setParallelism(getParallelism(pythonOperationInfo)).writeAsText(pythonOperationInfo.path, pythonOperationInfo.writeMode).setParallelism(getParallelism(pythonOperationInfo)).name("TextSink");
    }

    private void createPrintSink(PythonOperationInfo pythonOperationInfo) throws IOException {
        ((DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.parentID))).map(new StringDeserializerMap()).setParallelism(getParallelism(pythonOperationInfo)).name("PrintSinkPreStep").output(new PrintingOutputFormat(pythonOperationInfo.toError)).setParallelism(getParallelism(pythonOperationInfo));
    }

    private void createBroadcastVariable(PythonOperationInfo pythonOperationInfo) throws IOException {
        UdfOperator udfOperator = (UdfOperator) this.sets.get(Integer.valueOf(pythonOperationInfo.parentID));
        udfOperator.withBroadcastSet((DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.otherID)), pythonOperationInfo.name);
        Configuration parameters = udfOperator.getParameters();
        if (parameters == null) {
            parameters = new Configuration();
        }
        int integer = parameters.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0);
        parameters.setInteger(PLANBINDER_CONFIG_BCVAR_COUNT, integer + 1);
        parameters.setString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + integer, pythonOperationInfo.name);
        udfOperator.withParameters(parameters);
    }

    private void createAggregationOperation(PythonOperationInfo pythonOperationInfo) throws IOException {
        AggregateOperator aggregate = ((DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.parentID))).aggregate(pythonOperationInfo.aggregates[0].agg, pythonOperationInfo.aggregates[0].field);
        for (int i = 1; i < pythonOperationInfo.count; i++) {
            aggregate = aggregate.and(pythonOperationInfo.aggregates[i].agg, pythonOperationInfo.aggregates[i].field);
        }
        this.sets.put(Integer.valueOf(pythonOperationInfo.setID), aggregate.setParallelism(getParallelism(pythonOperationInfo)).name("Aggregation"));
    }

    private void createDistinctOperation(PythonOperationInfo pythonOperationInfo) throws IOException {
        this.sets.put(Integer.valueOf(pythonOperationInfo.setID), ((DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.parentID))).distinct(pythonOperationInfo.keys).setParallelism(getParallelism(pythonOperationInfo)).name("Distinct").map(new KeyDiscarder()).setParallelism(getParallelism(pythonOperationInfo)).name("DistinctPostStep"));
    }

    private void createFirstOperation(PythonOperationInfo pythonOperationInfo) throws IOException {
        this.sets.put(Integer.valueOf(pythonOperationInfo.setID), ((DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.parentID))).first(pythonOperationInfo.count).setParallelism(getParallelism(pythonOperationInfo)).name("First"));
    }

    private void createGroupOperation(PythonOperationInfo pythonOperationInfo) throws IOException {
        this.sets.put(Integer.valueOf(pythonOperationInfo.setID), ((DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.parentID))).groupBy(pythonOperationInfo.keys));
    }

    private void createHashPartitionOperation(PythonOperationInfo pythonOperationInfo) throws IOException {
        this.sets.put(Integer.valueOf(pythonOperationInfo.setID), ((DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.parentID))).partitionByHash(pythonOperationInfo.keys).setParallelism(getParallelism(pythonOperationInfo)).map(new KeyDiscarder()).setParallelism(getParallelism(pythonOperationInfo)).name("HashPartitionPostStep"));
    }

    private void createRebalanceOperation(PythonOperationInfo pythonOperationInfo) throws IOException {
        this.sets.put(Integer.valueOf(pythonOperationInfo.setID), ((DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.parentID))).rebalance().setParallelism(getParallelism(pythonOperationInfo)).name("Rebalance"));
    }

    private void createSortOperation(PythonOperationInfo pythonOperationInfo) throws IOException {
        UnsortedGrouping unsortedGrouping = (Grouping) this.sets.get(Integer.valueOf(pythonOperationInfo.parentID));
        if (unsortedGrouping instanceof UnsortedGrouping) {
            this.sets.put(Integer.valueOf(pythonOperationInfo.setID), unsortedGrouping.sortGroup(pythonOperationInfo.field, pythonOperationInfo.order));
        } else if (unsortedGrouping instanceof SortedGrouping) {
            this.sets.put(Integer.valueOf(pythonOperationInfo.setID), ((SortedGrouping) unsortedGrouping).sortGroup(pythonOperationInfo.field, pythonOperationInfo.order));
        }
    }

    private void createUnionOperation(PythonOperationInfo pythonOperationInfo) throws IOException {
        this.sets.put(Integer.valueOf(pythonOperationInfo.setID), ((DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.parentID))).union((DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.otherID))).setParallelism(getParallelism(pythonOperationInfo)).name("Union"));
    }

    private void createCoGroupOperation(PythonOperationInfo pythonOperationInfo) {
        DataSet dataSet = (DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.parentID));
        DataSet dataSet2 = (DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.otherID));
        this.sets.put(Integer.valueOf(pythonOperationInfo.setID), new CoGroupRawOperator(dataSet, dataSet2, new Keys.ExpressionKeys(pythonOperationInfo.keys1, dataSet.getType()), new Keys.ExpressionKeys(pythonOperationInfo.keys2, dataSet2.getType()), new PythonCoGroup(pythonOperationInfo.setID, pythonOperationInfo.types), pythonOperationInfo.types, pythonOperationInfo.name).setParallelism(getParallelism(pythonOperationInfo)));
    }

    private void createCrossOperation(PythonOperationInfo.DatasizeHint datasizeHint, PythonOperationInfo pythonOperationInfo) {
        CrossOperator.DefaultCross crossWithTiny;
        DataSet dataSet = (DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.parentID));
        DataSet dataSet2 = (DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.otherID));
        switch (datasizeHint) {
            case NONE:
                crossWithTiny = dataSet.cross(dataSet2);
                break;
            case HUGE:
                crossWithTiny = dataSet.crossWithHuge(dataSet2);
                break;
            case TINY:
                crossWithTiny = dataSet.crossWithTiny(dataSet2);
                break;
            default:
                throw new IllegalArgumentException("Invalid Cross mode specified: " + datasizeHint);
        }
        crossWithTiny.setParallelism(getParallelism(pythonOperationInfo));
        if (pythonOperationInfo.usesUDF) {
            this.sets.put(Integer.valueOf(pythonOperationInfo.setID), crossWithTiny.mapPartition(new PythonMapPartition(pythonOperationInfo.setID, pythonOperationInfo.types)).setParallelism(getParallelism(pythonOperationInfo)).name(pythonOperationInfo.name));
        } else {
            this.sets.put(Integer.valueOf(pythonOperationInfo.setID), crossWithTiny.name("DefaultCross"));
        }
    }

    private void createFilterOperation(PythonOperationInfo pythonOperationInfo) {
        this.sets.put(Integer.valueOf(pythonOperationInfo.setID), ((DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.parentID))).mapPartition(new PythonMapPartition(pythonOperationInfo.setID, pythonOperationInfo.types)).setParallelism(getParallelism(pythonOperationInfo)).name(pythonOperationInfo.name));
    }

    private void createFlatMapOperation(PythonOperationInfo pythonOperationInfo) {
        this.sets.put(Integer.valueOf(pythonOperationInfo.setID), ((DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.parentID))).mapPartition(new PythonMapPartition(pythonOperationInfo.setID, pythonOperationInfo.types)).setParallelism(getParallelism(pythonOperationInfo)).name(pythonOperationInfo.name));
    }

    private void createGroupReduceOperation(PythonOperationInfo pythonOperationInfo) {
        Object obj = this.sets.get(Integer.valueOf(pythonOperationInfo.parentID));
        if (obj instanceof DataSet) {
            this.sets.put(Integer.valueOf(pythonOperationInfo.setID), applyGroupReduceOperation((DataSet) obj, pythonOperationInfo));
        } else if (obj instanceof UnsortedGrouping) {
            this.sets.put(Integer.valueOf(pythonOperationInfo.setID), applyGroupReduceOperation((UnsortedGrouping) obj, pythonOperationInfo));
        } else if (obj instanceof SortedGrouping) {
            this.sets.put(Integer.valueOf(pythonOperationInfo.setID), applyGroupReduceOperation((SortedGrouping) obj, pythonOperationInfo));
        }
    }

    private DataSet applyGroupReduceOperation(DataSet dataSet, PythonOperationInfo pythonOperationInfo) {
        return dataSet.reduceGroup(new IdentityGroupReduce()).setCombinable(false).name("PythonGroupReducePreStep").setParallelism(getParallelism(pythonOperationInfo)).mapPartition(new PythonMapPartition(pythonOperationInfo.setID, pythonOperationInfo.types)).setParallelism(getParallelism(pythonOperationInfo)).name(pythonOperationInfo.name);
    }

    private DataSet applyGroupReduceOperation(UnsortedGrouping unsortedGrouping, PythonOperationInfo pythonOperationInfo) {
        return unsortedGrouping.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(pythonOperationInfo)).name("PythonGroupReducePreStep").mapPartition(new PythonMapPartition(pythonOperationInfo.setID, pythonOperationInfo.types)).setParallelism(getParallelism(pythonOperationInfo)).name(pythonOperationInfo.name);
    }

    private DataSet applyGroupReduceOperation(SortedGrouping sortedGrouping, PythonOperationInfo pythonOperationInfo) {
        return sortedGrouping.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(pythonOperationInfo)).name("PythonGroupReducePreStep").mapPartition(new PythonMapPartition(pythonOperationInfo.setID, pythonOperationInfo.types)).setParallelism(getParallelism(pythonOperationInfo)).name(pythonOperationInfo.name);
    }

    private void createJoinOperation(PythonOperationInfo.DatasizeHint datasizeHint, PythonOperationInfo pythonOperationInfo) {
        DataSet dataSet = (DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.parentID));
        DataSet dataSet2 = (DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.otherID));
        if (pythonOperationInfo.usesUDF) {
            this.sets.put(Integer.valueOf(pythonOperationInfo.setID), createDefaultJoin(dataSet, dataSet2, pythonOperationInfo.keys1, pythonOperationInfo.keys2, datasizeHint, getParallelism(pythonOperationInfo)).mapPartition(new PythonMapPartition(pythonOperationInfo.setID, pythonOperationInfo.types)).setParallelism(getParallelism(pythonOperationInfo)).name(pythonOperationInfo.name));
        } else {
            this.sets.put(Integer.valueOf(pythonOperationInfo.setID), createDefaultJoin(dataSet, dataSet2, pythonOperationInfo.keys1, pythonOperationInfo.keys2, datasizeHint, getParallelism(pythonOperationInfo)));
        }
    }

    private DataSet createDefaultJoin(DataSet dataSet, DataSet dataSet2, String[] strArr, String[] strArr2, PythonOperationInfo.DatasizeHint datasizeHint, int i) {
        switch (datasizeHint) {
            case NONE:
                return dataSet.join(dataSet2).where(strArr).equalTo(strArr2).setParallelism(i).map(new NestedKeyDiscarder()).setParallelism(i).name("DefaultJoinPostStep");
            case HUGE:
                return dataSet.joinWithHuge(dataSet2).where(strArr).equalTo(strArr2).setParallelism(i).map(new NestedKeyDiscarder()).setParallelism(i).name("DefaultJoinPostStep");
            case TINY:
                return dataSet.joinWithTiny(dataSet2).where(strArr).equalTo(strArr2).setParallelism(i).map(new NestedKeyDiscarder()).setParallelism(i).name("DefaultJoinPostStep");
            default:
                throw new IllegalArgumentException("Invalid join mode specified.");
        }
    }

    private void createMapOperation(PythonOperationInfo pythonOperationInfo) {
        this.sets.put(Integer.valueOf(pythonOperationInfo.setID), ((DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.parentID))).mapPartition(new PythonMapPartition(pythonOperationInfo.setID, pythonOperationInfo.types)).setParallelism(getParallelism(pythonOperationInfo)).name(pythonOperationInfo.name));
    }

    private void createMapPartitionOperation(PythonOperationInfo pythonOperationInfo) {
        this.sets.put(Integer.valueOf(pythonOperationInfo.setID), ((DataSet) this.sets.get(Integer.valueOf(pythonOperationInfo.parentID))).mapPartition(new PythonMapPartition(pythonOperationInfo.setID, pythonOperationInfo.types)).setParallelism(getParallelism(pythonOperationInfo)).name(pythonOperationInfo.name));
    }

    private void createReduceOperation(PythonOperationInfo pythonOperationInfo) {
        Object obj = this.sets.get(Integer.valueOf(pythonOperationInfo.parentID));
        if (obj instanceof DataSet) {
            this.sets.put(Integer.valueOf(pythonOperationInfo.setID), applyReduceOperation((DataSet) obj, pythonOperationInfo));
        } else if (obj instanceof UnsortedGrouping) {
            this.sets.put(Integer.valueOf(pythonOperationInfo.setID), applyReduceOperation((UnsortedGrouping) obj, pythonOperationInfo));
        }
    }

    private DataSet applyReduceOperation(DataSet dataSet, PythonOperationInfo pythonOperationInfo) {
        return dataSet.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(pythonOperationInfo)).name("PythonReducePreStep").mapPartition(new PythonMapPartition(pythonOperationInfo.setID, pythonOperationInfo.types)).setParallelism(getParallelism(pythonOperationInfo)).name(pythonOperationInfo.name);
    }

    private DataSet applyReduceOperation(UnsortedGrouping unsortedGrouping, PythonOperationInfo pythonOperationInfo) {
        return unsortedGrouping.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(pythonOperationInfo)).name("PythonReducePreStep").mapPartition(new PythonMapPartition(pythonOperationInfo.setID, pythonOperationInfo.types)).setParallelism(getParallelism(pythonOperationInfo)).name(pythonOperationInfo.name);
    }
}
