/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.python.api.environment;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
import org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
import org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
import org.apache.flink.streaming.python.util.AdapterMap;
import org.apache.flink.streaming.python.util.serialization.PyBooleanSerializer;
import org.apache.flink.streaming.python.util.serialization.PyFloatSerializer;
import org.apache.flink.streaming.python.util.serialization.PyIntegerSerializer;
import org.apache.flink.streaming.python.util.serialization.PyLongSerializer;
import org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
import org.apache.flink.streaming.python.util.serialization.PyStringSerializer;
import org.python.core.PyBoolean;
import org.python.core.PyFloat;
import org.python.core.PyInstance;
import org.python.core.PyInteger;
import org.python.core.PyLong;
import org.python.core.PyObject;
import org.python.core.PyObjectDerived;
import org.python.core.PyString;
import org.python.core.PyTuple;
import org.python.core.PyUnicode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class PythonStreamExecutionEnvironment {
    private static final Logger LOG = LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
    private final StreamExecutionEnvironment env;
    private final Path pythonTmpCachePath;

    PythonStreamExecutionEnvironment(StreamExecutionEnvironment env, Path tmpLocalDir, String scriptName) {
        this.env = env;
        this.pythonTmpCachePath = tmpLocalDir;
        env.getConfig().setGlobalJobParameters((ExecutionConfig.GlobalJobParameters)new PythonJobParameters(scriptName));
        PythonStreamExecutionEnvironment.registerJythonSerializers(this.env);
    }

    private static void registerJythonSerializers(StreamExecutionEnvironment env) {
        env.registerTypeWithKryoSerializer(PyBoolean.class, PyBooleanSerializer.class);
        env.registerTypeWithKryoSerializer(PyFloat.class, PyFloatSerializer.class);
        env.registerTypeWithKryoSerializer(PyInteger.class, PyIntegerSerializer.class);
        env.registerTypeWithKryoSerializer(PyLong.class, PyLongSerializer.class);
        env.registerTypeWithKryoSerializer(PyString.class, PyStringSerializer.class);
        env.registerTypeWithKryoSerializer(PyUnicode.class, PyObjectSerializer.class);
        env.registerTypeWithKryoSerializer(PyTuple.class, PyObjectSerializer.class);
        env.registerTypeWithKryoSerializer(PyObjectDerived.class, PyObjectSerializer.class);
        env.registerTypeWithKryoSerializer(PyInstance.class, PyObjectSerializer.class);
    }

    public PythonDataStream create_python_source(SourceFunction<Object> src) throws Exception {
        return new PythonDataStream<SingleOutputStreamOperator>(this.env.addSource((SourceFunction)new PythonGeneratorFunction(src)).map(new AdapterMap()));
    }

    public PythonDataStream add_java_source(SourceFunction<Object> src) {
        return new PythonDataStream<SingleOutputStreamOperator>(this.env.addSource(src).map(new AdapterMap()));
    }

    public PythonDataStream from_elements(PyObject ... elements) {
        return new PythonDataStream<DataStreamSource>(this.env.fromElements((Object[])elements));
    }

    public PythonDataStream from_collection(Collection<Object> collection) {
        return new PythonDataStream<SingleOutputStreamOperator>(this.env.fromCollection(collection).map(new AdapterMap()));
    }

    public PythonDataStream from_collection(Iterator<Object> iter) throws Exception {
        return new PythonDataStream<SingleOutputStreamOperator>(this.env.addSource((SourceFunction)new PythonIteratorFunction(iter), TypeExtractor.getForClass(Object.class)).map(new AdapterMap()));
    }

    public PythonDataStream generate_sequence(long from, long to) {
        return new PythonDataStream<SingleOutputStreamOperator>(this.env.generateSequence(from, to).map(new AdapterMap()));
    }

    public PythonDataStream read_text_file(String path) throws IOException {
        return new PythonDataStream<SingleOutputStreamOperator>(this.env.readTextFile(path).map(new AdapterMap()));
    }

    public PythonDataStream socket_text_stream(String host, int port) {
        return new PythonDataStream<SingleOutputStreamOperator>(this.env.socketTextStream(host, port).map(new AdapterMap()));
    }

    public PythonStreamExecutionEnvironment enable_checkpointing(long interval) {
        this.env.enableCheckpointing(interval);
        return this;
    }

    public PythonStreamExecutionEnvironment enable_checkpointing(long interval, CheckpointingMode mode) {
        this.env.enableCheckpointing(interval, mode);
        return this;
    }

    public PythonStreamExecutionEnvironment set_parallelism(int parallelism) {
        this.env.setParallelism(parallelism);
        return this;
    }

    public JobExecutionResult execute() throws Exception {
        this.distributeFiles();
        JobExecutionResult result2 = this.env.execute();
        return result2;
    }

    public JobExecutionResult execute(String job_name) throws Exception {
        this.distributeFiles();
        JobExecutionResult result2 = this.env.execute(job_name);
        return result2;
    }

    private void distributeFiles() throws IOException {
        this.env.registerCachedFile(this.pythonTmpCachePath.getPath(), "flink");
    }

    public static class PythonJobParameters
    extends ExecutionConfig.GlobalJobParameters {
        private static final String KEY_SCRIPT_NAME = "scriptName";
        private Map<String, String> parameters = new HashMap<String, String>();

        PythonJobParameters(String scriptName) {
            this.parameters.put(KEY_SCRIPT_NAME, scriptName);
        }

        public Map<String, String> toMap() {
            return this.parameters;
        }

        public static String getScriptName(ExecutionConfig.GlobalJobParameters parameters) {
            return (String)parameters.toMap().get(KEY_SCRIPT_NAME);
        }
    }
}

