package org.apache.flink.python.util;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.python.PythonConfig;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
import org.apache.flink.streaming.api.operators.python.OneInputPythonFunctionOperator;
import org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator;
import org.apache.flink.streaming.api.operators.python.PythonPartitionCustomOperator;
import org.apache.flink.streaming.api.operators.python.PythonTimestampsAndWatermarksOperator;
import org.apache.flink.streaming.api.operators.python.TwoInputPythonFunctionOperator;
import org.apache.flink.streaming.api.transformations.AbstractMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.WithBoundedness;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;

/* loaded from: input_file:org/apache/flink/python/util/PythonConfigUtil.class */
public class PythonConfigUtil {
    public static final String KEYED_STREAM_VALUE_OPERATOR_NAME = "_keyed_stream_values_operator";
    public static final String STREAM_KEY_BY_MAP_OPERATOR_NAME = "_stream_key_by_map_operator";
    public static final String STREAM_PARTITION_CUSTOM_MAP_OPERATOR_NAME = "_partition_custom_map_operator";

    public static Configuration getEnvConfigWithDependencies(StreamExecutionEnvironment streamExecutionEnvironment) throws InvocationTargetException, IllegalAccessException, NoSuchMethodException {
        return PythonDependencyUtils.configurePythonDependencies(streamExecutionEnvironment.getCachedFiles(), getEnvironmentConfig(streamExecutionEnvironment));
    }

    public static Configuration getEnvironmentConfig(StreamExecutionEnvironment streamExecutionEnvironment) throws InvocationTargetException, IllegalAccessException, NoSuchMethodException {
        Method method = null;
        Class<?> cls = streamExecutionEnvironment.getClass();
        while (true) {
            Class<?> cls2 = cls;
            if (cls2 == Object.class) {
                break;
            }
            try {
                method = cls2.getDeclaredMethod("getConfiguration", new Class[0]);
                break;
            } catch (NoSuchMethodException e) {
                cls = cls2.getSuperclass();
            }
        }
        if (method == null) {
            throw new NoSuchMethodException("Method getConfigurationMethod not found.");
        }
        method.setAccessible(true);
        return (Configuration) method.invoke(streamExecutionEnvironment, new Object[0]);
    }

    private static void alignStreamNode(StreamNode streamNode, StreamGraph streamGraph) {
        if (streamNode.getOperatorName().equals(KEYED_STREAM_VALUE_OPERATOR_NAME)) {
            StreamEdge streamEdge = (StreamEdge) streamNode.getOutEdges().get(0);
            chainStreamNode(streamEdge, streamNode, streamGraph.getStreamNode(Integer.valueOf(streamEdge.getTargetId())));
            streamEdge.setPartitioner(new ForwardPartitioner());
        }
        if (streamNode.getOperatorName().equals(STREAM_KEY_BY_MAP_OPERATOR_NAME) || streamNode.getOperatorName().equals(STREAM_PARTITION_CUSTOM_MAP_OPERATOR_NAME)) {
            StreamEdge streamEdge2 = (StreamEdge) streamNode.getInEdges().get(0);
            chainStreamNode(streamEdge2, streamNode, streamGraph.getStreamNode(Integer.valueOf(streamEdge2.getSourceId())));
        }
    }

    private static void chainStreamNode(StreamEdge streamEdge, StreamNode streamNode, StreamNode streamNode2) {
        streamEdge.setPartitioner(new ForwardPartitioner());
        streamNode.setParallelism(Integer.valueOf(streamNode2.getParallelism()));
        streamNode.setCoLocationGroup(streamNode2.getCoLocationGroup());
        streamNode.setSlotSharingGroup(streamNode2.getSlotSharingGroup());
    }

    public static StreamGraph generateStreamGraphWithDependencies(StreamExecutionEnvironment streamExecutionEnvironment, boolean z) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
        Configuration envConfigWithDependencies = getEnvConfigWithDependencies(streamExecutionEnvironment);
        boolean isExecuteInBatchMode = isExecuteInBatchMode(streamExecutionEnvironment, envConfigWithDependencies);
        if (isExecuteInBatchMode) {
            throw new UnsupportedOperationException("Batch mode is still not supported in Python DataStream API.");
        }
        if (envConfigWithDependencies.getBoolean(PythonOptions.USE_MANAGED_MEMORY)) {
            Field declaredField = StreamExecutionEnvironment.class.getDeclaredField("transformations");
            declaredField.setAccessible(true);
            for (OneInputTransformation oneInputTransformation : (List) declaredField.get(streamExecutionEnvironment)) {
                if ((oneInputTransformation instanceof OneInputTransformation) && isPythonOperator(oneInputTransformation.getOperatorFactory())) {
                    oneInputTransformation.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
                } else if ((oneInputTransformation instanceof TwoInputTransformation) && isPythonOperator(((TwoInputTransformation) oneInputTransformation).getOperatorFactory())) {
                    oneInputTransformation.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
                } else if ((oneInputTransformation instanceof AbstractMultipleInputTransformation) && isPythonOperator(((AbstractMultipleInputTransformation) oneInputTransformation).getOperatorFactory())) {
                    oneInputTransformation.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
                }
            }
        }
        StreamGraph streamGraph = streamExecutionEnvironment.getStreamGraph(getEnvironmentConfig(streamExecutionEnvironment).getString(PipelineOptions.NAME, "Flink Streaming Job"), z);
        Collection<StreamNode> streamNodes = streamGraph.getStreamNodes();
        for (StreamNode streamNode : streamNodes) {
            alignStreamNode(streamNode, streamGraph);
            SimpleOperatorFactory operatorFactory = streamNode.getOperatorFactory();
            if (operatorFactory instanceof SimpleOperatorFactory) {
                PythonTimestampsAndWatermarksOperator operator = operatorFactory.getOperator();
                if ((operator instanceof OneInputPythonFunctionOperator) || (operator instanceof TwoInputPythonFunctionOperator) || (operator instanceof PythonKeyedProcessOperator)) {
                    PythonTimestampsAndWatermarksOperator pythonTimestampsAndWatermarksOperator = operator;
                    pythonTimestampsAndWatermarksOperator.setPythonConfig(generateNewPythonConfig(pythonTimestampsAndWatermarksOperator.getPythonConfig().getMergedConfig(), envConfigWithDependencies));
                    if (operator instanceof PythonTimestampsAndWatermarksOperator) {
                        operator.configureEmitProgressiveWatermarks(!isExecuteInBatchMode);
                    }
                }
            }
        }
        setStreamPartitionCustomOperatorNumPartitions(streamNodes, streamGraph);
        return streamGraph;
    }

    private static boolean isPythonOperator(StreamOperatorFactory streamOperatorFactory) {
        if (streamOperatorFactory instanceof SimpleOperatorFactory) {
            return ((SimpleOperatorFactory) streamOperatorFactory).getOperator() instanceof AbstractPythonFunctionOperator;
        }
        return false;
    }

    private static void setStreamPartitionCustomOperatorNumPartitions(Collection<StreamNode> collection, StreamGraph streamGraph) {
        for (StreamNode streamNode : collection) {
            SimpleOperatorFactory operatorFactory = streamNode.getOperatorFactory();
            if (operatorFactory instanceof SimpleOperatorFactory) {
                PythonPartitionCustomOperator operator = operatorFactory.getOperator();
                if (operator instanceof PythonPartitionCustomOperator) {
                    operator.setNumPartitions(streamGraph.getStreamNode(Integer.valueOf(((StreamEdge) streamNode.getOutEdges().get(0)).getTargetId())).getParallelism());
                }
            }
        }
    }

    private static PythonConfig generateNewPythonConfig(Configuration configuration, Configuration configuration2) {
        Configuration clone = configuration2.clone();
        clone.addAll(configuration);
        return new PythonConfig(clone);
    }

    private static boolean isExecuteInBatchMode(StreamExecutionEnvironment streamExecutionEnvironment, Configuration configuration) throws NoSuchFieldException, IllegalAccessException {
        RuntimeExecutionMode runtimeExecutionMode = (RuntimeExecutionMode) configuration.get(ExecutionOptions.RUNTIME_MODE);
        if (runtimeExecutionMode != RuntimeExecutionMode.AUTOMATIC) {
            return runtimeExecutionMode == RuntimeExecutionMode.BATCH;
        }
        Field declaredField = StreamExecutionEnvironment.class.getDeclaredField("transformations");
        declaredField.setAccessible(true);
        boolean z = false;
        for (WithBoundedness withBoundedness : (List) declaredField.get(streamExecutionEnvironment)) {
            z = z || ((withBoundedness instanceof WithBoundedness) && withBoundedness.getBoundedness() != Boundedness.BOUNDED);
        }
        return !z;
    }
}
