package org.apache.flink.python.util;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.shaded.guava30.com.google.common.collect.Queues;
import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.python.AbstractDataStreamPythonFunctionOperator;
import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
import org.apache.flink.streaming.api.transformations.AbstractMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;

/* loaded from: input_file:org/apache/flink/python/util/PythonConfigUtil.class */
public class PythonConfigUtil {
    public static final String KEYED_STREAM_VALUE_OPERATOR_NAME = "_keyed_stream_values_operator";
    public static final String STREAM_KEY_BY_MAP_OPERATOR_NAME = "_stream_key_by_map_operator";
    public static final String STREAM_PARTITION_CUSTOM_MAP_OPERATOR_NAME = "_partition_custom_map_operator";

    public static Configuration getEnvConfigWithDependencies(StreamExecutionEnvironment streamExecutionEnvironment) throws InvocationTargetException, IllegalAccessException, NoSuchFieldException {
        return PythonDependencyUtils.configurePythonDependencies(streamExecutionEnvironment.getCachedFiles(), streamExecutionEnvironment.getConfiguration());
    }

    public static Configuration getEnvironmentConfig(StreamExecutionEnvironment streamExecutionEnvironment) throws InvocationTargetException, IllegalAccessException, NoSuchFieldException {
        Field field = null;
        Class<?> cls = streamExecutionEnvironment.getClass();
        while (true) {
            Class<?> cls2 = cls;
            if (cls2 == Object.class) {
                break;
            }
            try {
                field = cls2.getDeclaredField("configuration");
                break;
            } catch (NoSuchFieldException e) {
                cls = cls2.getSuperclass();
            }
        }
        if (field == null) {
            throw new NoSuchFieldException("Field 'configuration' not found.");
        }
        field.setAccessible(true);
        return (Configuration) field.get(streamExecutionEnvironment);
    }

    public static void configPythonOperator(StreamExecutionEnvironment streamExecutionEnvironment) throws IllegalAccessException, InvocationTargetException, NoSuchFieldException {
        Configuration envConfigWithDependencies = getEnvConfigWithDependencies(streamExecutionEnvironment);
        Field declaredField = StreamExecutionEnvironment.class.getDeclaredField("transformations");
        declaredField.setAccessible(true);
        for (Transformation transformation : (List) declaredField.get(streamExecutionEnvironment)) {
            alignTransformation(transformation);
            if (isPythonOperator((Transformation<?>) transformation)) {
                transformation.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
                AbstractPythonFunctionOperator<?> pythonOperator = getPythonOperator(transformation);
                if (pythonOperator != null) {
                    pythonOperator.setConfiguration(generateNewPythonConfig(pythonOperator.getConfiguration(), envConfigWithDependencies));
                }
            }
        }
    }

    public static Configuration getMergedConfig(StreamExecutionEnvironment streamExecutionEnvironment, TableConfig tableConfig) {
        Configuration configuration = new Configuration(streamExecutionEnvironment.getConfiguration());
        PythonDependencyUtils.merge(configuration, tableConfig.getConfiguration());
        Configuration configurePythonDependencies = PythonDependencyUtils.configurePythonDependencies(streamExecutionEnvironment.getCachedFiles(), configuration);
        configurePythonDependencies.setString("table.exec.timezone", tableConfig.getLocalTimeZone().getId());
        return configurePythonDependencies;
    }

    public static Configuration getMergedConfig(ExecutionEnvironment executionEnvironment, TableConfig tableConfig) {
        try {
            Field declaredField = ExecutionEnvironment.class.getDeclaredField("cacheFile");
            declaredField.setAccessible(true);
            Configuration configuration = new Configuration(executionEnvironment.getConfiguration());
            PythonDependencyUtils.merge(configuration, tableConfig.getConfiguration());
            Configuration configurePythonDependencies = PythonDependencyUtils.configurePythonDependencies((List) declaredField.get(executionEnvironment), configuration);
            configurePythonDependencies.setString("table.exec.timezone", tableConfig.getLocalTimeZone().getId());
            return configurePythonDependencies;
        } catch (IllegalAccessException | NoSuchFieldException e) {
            throw new TableException("Method getMergedConfig failed.", e);
        }
    }

    private static void alignTransformation(Transformation<?> transformation) throws NoSuchFieldException, IllegalAccessException {
        String name = transformation.getName();
        if (transformation.getInputs().isEmpty()) {
            return;
        }
        Transformation transformation2 = (Transformation) transformation.getInputs().get(0);
        if (transformation2.getName().equals(KEYED_STREAM_VALUE_OPERATOR_NAME)) {
            chainTransformation(transformation2, transformation);
            configForwardPartitioner(transformation2, transformation);
        }
        if (name.equals(STREAM_KEY_BY_MAP_OPERATOR_NAME) || name.equals(STREAM_PARTITION_CUSTOM_MAP_OPERATOR_NAME)) {
            chainTransformation(transformation, transformation2);
            configForwardPartitioner(transformation2, transformation);
        }
    }

    private static void chainTransformation(Transformation<?> transformation, Transformation<?> transformation2) {
        Optional slotSharingGroup = transformation2.getSlotSharingGroup();
        transformation.getClass();
        slotSharingGroup.ifPresent(transformation::setSlotSharingGroup);
        transformation.setCoLocationGroupKey(transformation2.getCoLocationGroupKey());
        transformation.setParallelism(transformation2.getParallelism());
    }

    private static void configForwardPartitioner(Transformation<?> transformation, Transformation<?> transformation2) throws IllegalAccessException, NoSuchFieldException {
        PartitionTransformation partitionTransformation = new PartitionTransformation(transformation, new ForwardPartitioner());
        Field declaredField = transformation2.getClass().getDeclaredField("input");
        declaredField.setAccessible(true);
        declaredField.set(transformation2, partitionTransformation);
    }

    private static AbstractPythonFunctionOperator<?> getPythonOperator(Transformation<?> transformation) {
        StreamOperatorFactory streamOperatorFactory = null;
        if (transformation instanceof OneInputTransformation) {
            streamOperatorFactory = ((OneInputTransformation) transformation).getOperatorFactory();
        } else if (transformation instanceof TwoInputTransformation) {
            streamOperatorFactory = ((TwoInputTransformation) transformation).getOperatorFactory();
        } else if (transformation instanceof AbstractMultipleInputTransformation) {
            streamOperatorFactory = ((AbstractMultipleInputTransformation) transformation).getOperatorFactory();
        }
        if ((streamOperatorFactory instanceof SimpleOperatorFactory) && (((SimpleOperatorFactory) streamOperatorFactory).getOperator() instanceof AbstractPythonFunctionOperator)) {
            return ((SimpleOperatorFactory) streamOperatorFactory).getOperator();
        }
        return null;
    }

    public static boolean isPythonOperator(Transformation<?> transformation) {
        if (transformation instanceof OneInputTransformation) {
            return isPythonOperator((StreamOperatorFactory<?>) ((OneInputTransformation) transformation).getOperatorFactory());
        }
        if (transformation instanceof TwoInputTransformation) {
            return isPythonOperator((StreamOperatorFactory<?>) ((TwoInputTransformation) transformation).getOperatorFactory());
        }
        if (transformation instanceof AbstractMultipleInputTransformation) {
            return isPythonOperator((StreamOperatorFactory<?>) ((AbstractMultipleInputTransformation) transformation).getOperatorFactory());
        }
        return false;
    }

    private static boolean isPythonOperator(StreamOperatorFactory<?> streamOperatorFactory) {
        if (streamOperatorFactory instanceof SimpleOperatorFactory) {
            return ((SimpleOperatorFactory) streamOperatorFactory).getOperator() instanceof AbstractPythonFunctionOperator;
        }
        return false;
    }

    public static boolean isPythonDataStreamOperator(Transformation<?> transformation) {
        if (transformation instanceof OneInputTransformation) {
            return isPythonDataStreamOperator((StreamOperatorFactory<?>) ((OneInputTransformation) transformation).getOperatorFactory());
        }
        if (transformation instanceof TwoInputTransformation) {
            return isPythonDataStreamOperator((StreamOperatorFactory<?>) ((TwoInputTransformation) transformation).getOperatorFactory());
        }
        return false;
    }

    private static boolean isPythonDataStreamOperator(StreamOperatorFactory<?> streamOperatorFactory) {
        if (streamOperatorFactory instanceof SimpleOperatorFactory) {
            return ((SimpleOperatorFactory) streamOperatorFactory).getOperator() instanceof AbstractDataStreamPythonFunctionOperator;
        }
        return false;
    }

    private static Configuration generateNewPythonConfig(Configuration configuration, Configuration configuration2) {
        Configuration clone = configuration2.clone();
        clone.addAll(configuration);
        return clone;
    }

    public static void setPartitionCustomOperatorNumPartitions(List<Transformation<?>> list) {
        Set newIdentityHashSet = Sets.newIdentityHashSet();
        ArrayDeque newArrayDeque = Queues.newArrayDeque(list);
        while (!newArrayDeque.isEmpty()) {
            Transformation transformation = (Transformation) newArrayDeque.poll();
            if (!newIdentityHashSet.contains(transformation) && !(transformation instanceof PartitionTransformation)) {
                newIdentityHashSet.add(transformation);
                getNonPartitionTransformationInput(transformation).ifPresent(transformation2 -> {
                    AbstractPythonFunctionOperator<?> pythonOperator = getPythonOperator(transformation2);
                    if (pythonOperator instanceof AbstractDataStreamPythonFunctionOperator) {
                        AbstractDataStreamPythonFunctionOperator abstractDataStreamPythonFunctionOperator = (AbstractDataStreamPythonFunctionOperator) pythonOperator;
                        if (abstractDataStreamPythonFunctionOperator.containsPartitionCustom()) {
                            abstractDataStreamPythonFunctionOperator.setNumPartitions(transformation.getParallelism());
                        }
                    }
                });
                newArrayDeque.addAll(transformation.getInputs());
            }
        }
    }

    private static Optional<Transformation<?>> getNonPartitionTransformationInput(Transformation<?> transformation) {
        if (transformation.getInputs().size() != 1) {
            return Optional.empty();
        }
        Transformation transformation2 = (Transformation) transformation.getInputs().get(0);
        return transformation2 instanceof PartitionTransformation ? getNonPartitionTransformationInput(transformation2) : Optional.of(transformation2);
    }
}
