package org.apache.flink.streaming.python.api.environment;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.python.PythonOptions;
import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
import org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
import org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
import org.apache.flink.streaming.python.util.AdapterMap;
import org.apache.flink.streaming.python.util.serialization.PyBooleanSerializer;
import org.apache.flink.streaming.python.util.serialization.PyFloatSerializer;
import org.apache.flink.streaming.python.util.serialization.PyIntegerSerializer;
import org.apache.flink.streaming.python.util.serialization.PyLongSerializer;
import org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
import org.apache.flink.streaming.python.util.serialization.PyStringSerializer;
import org.python.core.PyBoolean;
import org.python.core.PyFloat;
import org.python.core.PyInstance;
import org.python.core.PyInteger;
import org.python.core.PyLong;
import org.python.core.PyObject;
import org.python.core.PyObjectDerived;
import org.python.core.PyString;
import org.python.core.PyTuple;
import org.python.core.PyUnicode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.class */
public class PythonStreamExecutionEnvironment {
    private static final Logger LOG = LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
    private final StreamExecutionEnvironment env;
    private final Path pythonTmpCachePath;
    private final Path tmpDistributedDir;

    /* loaded from: input_file:org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment$PythonJobParameters.class */
    public static class PythonJobParameters extends ExecutionConfig.GlobalJobParameters {
        private static final String KEY_SCRIPT_NAME = "scriptName";
        private Map<String, String> parameters = new HashMap();

        PythonJobParameters(String str) {
            this.parameters.put(KEY_SCRIPT_NAME, str);
        }

        public Map<String, String> toMap() {
            return this.parameters;
        }

        public static String getScriptName(ExecutionConfig.GlobalJobParameters globalJobParameters) {
            return (String) globalJobParameters.toMap().get(KEY_SCRIPT_NAME);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PythonStreamExecutionEnvironment(StreamExecutionEnvironment streamExecutionEnvironment, Path path, Path path2, String str) {
        this.env = streamExecutionEnvironment;
        this.pythonTmpCachePath = path;
        this.tmpDistributedDir = path2;
        streamExecutionEnvironment.getConfig().setGlobalJobParameters(new PythonJobParameters(str));
        registerJythonSerializers(this.env);
    }

    private static void registerJythonSerializers(StreamExecutionEnvironment streamExecutionEnvironment) {
        streamExecutionEnvironment.registerTypeWithKryoSerializer(PyBoolean.class, PyBooleanSerializer.class);
        streamExecutionEnvironment.registerTypeWithKryoSerializer(PyFloat.class, PyFloatSerializer.class);
        streamExecutionEnvironment.registerTypeWithKryoSerializer(PyInteger.class, PyIntegerSerializer.class);
        streamExecutionEnvironment.registerTypeWithKryoSerializer(PyLong.class, PyLongSerializer.class);
        streamExecutionEnvironment.registerTypeWithKryoSerializer(PyString.class, PyStringSerializer.class);
        streamExecutionEnvironment.registerTypeWithKryoSerializer(PyUnicode.class, PyObjectSerializer.class);
        streamExecutionEnvironment.registerTypeWithKryoSerializer(PyTuple.class, PyObjectSerializer.class);
        streamExecutionEnvironment.registerTypeWithKryoSerializer(PyObjectDerived.class, PyObjectSerializer.class);
        streamExecutionEnvironment.registerTypeWithKryoSerializer(PyInstance.class, PyObjectSerializer.class);
    }

    public PythonDataStream create_python_source(SourceFunction<Object> sourceFunction) throws Exception {
        return new PythonDataStream(this.env.addSource(new PythonGeneratorFunction(sourceFunction)).map(new AdapterMap()));
    }

    public PythonDataStream add_java_source(SourceFunction<Object> sourceFunction) {
        return new PythonDataStream(this.env.addSource(sourceFunction).map(new AdapterMap()));
    }

    public PythonDataStream from_elements(PyObject... pyObjectArr) {
        return new PythonDataStream(this.env.fromElements(pyObjectArr));
    }

    public PythonDataStream from_collection(Collection<Object> collection) {
        return new PythonDataStream(this.env.fromCollection(collection).map(new AdapterMap()));
    }

    public PythonDataStream from_collection(Iterator<Object> it) throws Exception {
        return new PythonDataStream(this.env.addSource(new PythonIteratorFunction(it), TypeExtractor.getForClass(Object.class)).map(new AdapterMap()));
    }

    public PythonDataStream generate_sequence(long j, long j2) {
        return new PythonDataStream(this.env.generateSequence(j, j2).map(new AdapterMap()));
    }

    public PythonDataStream read_text_file(String str) throws IOException {
        return new PythonDataStream(this.env.readTextFile(str).map(new AdapterMap()));
    }

    public PythonDataStream socket_text_stream(String str, int i) {
        return new PythonDataStream(this.env.socketTextStream(str, i).map(new AdapterMap()));
    }

    public PythonStreamExecutionEnvironment enable_checkpointing(long j) {
        this.env.enableCheckpointing(j);
        return this;
    }

    public PythonStreamExecutionEnvironment enable_checkpointing(long j, CheckpointingMode checkpointingMode) {
        this.env.enableCheckpointing(j, checkpointingMode);
        return this;
    }

    public PythonStreamExecutionEnvironment set_parallelism(int i) {
        this.env.setParallelism(i);
        return this;
    }

    public JobExecutionResult execute() throws Exception {
        distributeFiles();
        JobExecutionResult execute = this.env.execute();
        cleanupDistributedFiles();
        return execute;
    }

    public JobExecutionResult execute(String str) throws Exception {
        distributeFiles();
        JobExecutionResult execute = this.env.execute(str);
        cleanupDistributedFiles();
        return execute;
    }

    private void distributeFiles() throws IOException {
        Path path = new Path(this.env instanceof LocalStreamEnvironment ? new Path((String) PythonOptions.DC_TMP_DIR.defaultValue()) : this.tmpDistributedDir, "flink_cache_" + UUID.randomUUID());
        FileCache.copy(this.pythonTmpCachePath, path, true);
        this.env.registerCachedFile(path.toUri().toString(), PythonConstants.FLINK_PYTHON_DC_ID);
    }

    private void cleanupDistributedFiles() throws IOException {
        for (Tuple2 tuple2 : this.env.getCachedFiles()) {
            Path path = new Path(((DistributedCache.DistributedCacheEntry) tuple2.f1).filePath);
            FileSystem fileSystem = path.getFileSystem();
            LOG.debug(String.format("Cleaning up cached path: %s, uriPath: %s, fileSystem: %s", ((DistributedCache.DistributedCacheEntry) tuple2.f1).filePath, path.getPath(), fileSystem.getClass().getName()));
            fileSystem.delete(new Path(path.getPath()), true);
        }
    }
}
