package org.apache.flink.table.planner.plan.nodes.exec.processor;

import java.util.ArrayList;
import java.util.Collections;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCalc;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCorrelate;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecMultipleInput;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonCalc;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonCorrelate;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort;
import org.apache.flink.table.planner.plan.nodes.exec.batch.InputSortedExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecExchange;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:flink-table-store-codegen.jar:org/apache/flink/table/planner/plan/nodes/exec/processor/ForwardHashExchangeProcessor.class */
public class ForwardHashExchangeProcessor implements ExecNodeGraphProcessor {
    @Override // org.apache.flink.table.planner.plan.nodes.exec.processor.ExecNodeGraphProcessor
    public ExecNodeGraph process(ExecNodeGraph execNodeGraph, ProcessorContext processorContext) {
        if (execNodeGraph.getRootNodes().get(0) instanceof StreamExecNode) {
            throw new TableException("StreamExecNode is not supported yet");
        }
        if (!processorContext.getPlanner().getExecEnv().getConfig().isDynamicGraph()) {
            return execNodeGraph;
        }
        final TableConfig tableConfig = processorContext.getPlanner().getTableConfig();
        AbstractExecNodeExactlyOnceVisitor abstractExecNodeExactlyOnceVisitor = new AbstractExecNodeExactlyOnceVisitor() { // from class: org.apache.flink.table.planner.plan.nodes.exec.processor.ForwardHashExchangeProcessor.1
            @Override // org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor
            protected void visitNode(ExecNode<?> execNode) {
                ExecEdge addExchangeAndReconnectEdge;
                visitInputs(execNode);
                if (execNode instanceof CommonExecExchange) {
                    return;
                }
                boolean z = false;
                ArrayList arrayList = new ArrayList(execNode.getInputEdges());
                for (int i = 0; i < execNode.getInputProperties().size(); i++) {
                    InputProperty inputProperty = execNode.getInputProperties().get(i);
                    InputProperty.RequiredDistribution requiredDistribution = inputProperty.getRequiredDistribution();
                    ExecEdge execEdge = execNode.getInputEdges().get(i);
                    if (requiredDistribution.getType() != InputProperty.DistributionType.HASH) {
                        boolean z2 = requiredDistribution.getType() == InputProperty.DistributionType.SINGLETON;
                        if (!ForwardHashExchangeProcessor.this.hasExchangeInput(execEdge) && ForwardHashExchangeProcessor.this.hasSortInputForInputSortedNode(execNode)) {
                            arrayList.set(i, ForwardHashExchangeProcessor.this.addExchangeAndReconnectEdge(tableConfig, execEdge, inputProperty, true, z2));
                            z = true;
                        }
                    } else if (!ForwardHashExchangeProcessor.this.hasExchangeInput(execEdge)) {
                        if (ForwardHashExchangeProcessor.this.isInputSortedNode(execNode)) {
                            if (ForwardHashExchangeProcessor.this.hasSortInputForInputSortedNode(execNode)) {
                                ExecNode<?> source = execEdge.getSource();
                                source.setInputEdges(Collections.singletonList(ForwardHashExchangeProcessor.this.addExchangeAndReconnectEdge(tableConfig, source.getInputEdges().get(0), inputProperty, false, true)));
                            }
                            addExchangeAndReconnectEdge = ForwardHashExchangeProcessor.this.addExchangeAndReconnectEdge(tableConfig, execEdge, inputProperty, true, true);
                        } else {
                            addExchangeAndReconnectEdge = ForwardHashExchangeProcessor.this.addExchangeAndReconnectEdge(tableConfig, execEdge, inputProperty, false, true);
                            ForwardHashExchangeProcessor.this.updateOriginalEdgeInMultipleInput(execNode, i, (BatchExecExchange) addExchangeAndReconnectEdge.getSource());
                        }
                        arrayList.set(i, addExchangeAndReconnectEdge);
                        z = true;
                    } else if (ForwardHashExchangeProcessor.this.hasSortInputForInputSortedNode(execNode)) {
                        arrayList.set(i, ForwardHashExchangeProcessor.this.addExchangeAndReconnectEdge(tableConfig, execEdge, inputProperty, true, true));
                        z = true;
                    }
                }
                if (z) {
                    execNode.setInputEdges(arrayList);
                }
            }
        };
        execNodeGraph.getRootNodes().forEach(execNode -> {
            execNode.accept(abstractExecNodeExactlyOnceVisitor);
        });
        return execNodeGraph;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ExecEdge addExchangeAndReconnectEdge(ReadableConfig readableConfig, ExecEdge execEdge, InputProperty inputProperty, boolean z, boolean z2) {
        ExecNode<?> target = execEdge.getTarget();
        ExecNode<?> source = execEdge.getSource();
        if (source instanceof CommonExecExchange) {
            return execEdge;
        }
        if (z2 && ((source instanceof BatchExecCalc) || (source instanceof BatchExecPythonCalc) || (source instanceof BatchExecSort) || (source instanceof BatchExecCorrelate) || (source instanceof BatchExecPythonCorrelate))) {
            source.setInputEdges(Collections.singletonList(addExchangeAndReconnectEdge(readableConfig, source.getInputEdges().get(0), inputProperty, z, true)));
        }
        BatchExecExchange createExchangeWithKeepInputAsIsDistribution = createExchangeWithKeepInputAsIsDistribution(readableConfig, inputProperty, z, (RowType) execEdge.getOutputType());
        createExchangeWithKeepInputAsIsDistribution.setInputEdges(Collections.singletonList(new ExecEdge(source, createExchangeWithKeepInputAsIsDistribution, execEdge.getShuffle(), execEdge.getExchangeMode())));
        return new ExecEdge(createExchangeWithKeepInputAsIsDistribution, target, execEdge.getShuffle(), execEdge.getExchangeMode());
    }

    private BatchExecExchange createExchangeWithKeepInputAsIsDistribution(ReadableConfig readableConfig, InputProperty inputProperty, boolean z, RowType rowType) {
        InputProperty build = InputProperty.builder().requiredDistribution(InputProperty.keepInputAsIsDistribution(inputProperty.getRequiredDistribution(), z)).damBehavior(inputProperty.getDamBehavior()).priority(inputProperty.getPriority()).build();
        return new BatchExecExchange(readableConfig, build, rowType, build.toString());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean hasExchangeInput(ExecEdge execEdge) {
        ExecNode<?> source = execEdge.getSource();
        if (hasSortInputForInputSortedNode(execEdge.getTarget())) {
            source = source.getInputEdges().get(0).getSource();
        }
        return source instanceof CommonExecExchange;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean hasSortInputForInputSortedNode(ExecNode<?> execNode) {
        return isInputSortedNode(execNode) && (execNode.getInputEdges().get(0).getSource() instanceof BatchExecSort);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isInputSortedNode(ExecNode<?> execNode) {
        return execNode instanceof InputSortedExecNode;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateOriginalEdgeInMultipleInput(ExecNode<?> execNode, int i, BatchExecExchange batchExecExchange) {
        if (execNode instanceof BatchExecMultipleInput) {
            updateOriginalEdgeInMultipleInput((BatchExecMultipleInput) execNode, i, batchExecExchange);
        }
    }

    private void updateOriginalEdgeInMultipleInput(BatchExecMultipleInput batchExecMultipleInput, int i, BatchExecExchange batchExecExchange) {
        ExecEdge execEdge = batchExecMultipleInput.getOriginalEdges().get(i);
        ExecNode<?> source = execEdge.getSource();
        ExecNode<?> target = execEdge.getTarget();
        int indexOf = target.getInputEdges().indexOf(execEdge);
        Preconditions.checkArgument(indexOf >= 0);
        ArrayList arrayList = new ArrayList(target.getInputEdges());
        batchExecExchange.setInputEdges(Collections.singletonList(new ExecEdge(source, batchExecExchange, execEdge.getShuffle(), execEdge.getExchangeMode())));
        arrayList.set(indexOf, new ExecEdge(batchExecExchange, target, execEdge.getShuffle(), execEdge.getExchangeMode()));
        target.setInputEdges(arrayList);
    }
}
