package org.apache.flink.table.planner.plan.processors.utils;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.calcite.rel.RelNode;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.transformations.ShuffleMode;
import org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;

@Internal
/* loaded from: input_file:org/apache/flink/table/planner/plan/processors/utils/InputPriorityConflictResolver.class */
public class InputPriorityConflictResolver extends InputPriorityGraphGenerator {
    private final ShuffleMode shuffleMode;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/planner/plan/processors/utils/InputPriorityConflictResolver$ConflictCausedByExchangeChecker.class */
    public static class ConflictCausedByExchangeChecker extends AbstractExecNodeExactlyOnceVisitor {
        private final BatchExecExchange exchange;
        private boolean found;

        private ConflictCausedByExchangeChecker(BatchExecExchange batchExecExchange) {
            this.exchange = batchExecExchange;
        }

        @Override // org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor
        protected void visitNode(ExecNode<?, ?> execNode) {
            if (execNode == this.exchange) {
                this.found = true;
            }
            Iterator<ExecNode<?, ?>> it = execNode.getInputNodes().iterator();
            while (it.hasNext()) {
                visit(it.next());
                if (this.found) {
                    return;
                }
            }
        }
    }

    public InputPriorityConflictResolver(List<ExecNode<?, ?>> list, ExecEdge.DamBehavior damBehavior, ShuffleMode shuffleMode) {
        super(list, Collections.emptySet(), damBehavior);
        this.shuffleMode = shuffleMode;
    }

    public void detectAndResolve() {
        createTopologyGraph();
    }

    @Override // org.apache.flink.table.planner.plan.processors.utils.InputPriorityGraphGenerator
    protected void resolveInputPriorityConflict(ExecNode<?, ?> execNode, int i, int i2) {
        ExecNode<?, ?> execNode2 = execNode.getInputNodes().get(i);
        ExecNode<?, ?> execNode3 = execNode.getInputNodes().get(i2);
        if (!(execNode3 instanceof BatchExecExchange)) {
            execNode.replaceInputNode(i2, createExchange(execNode, i2));
            return;
        }
        BatchExecExchange batchExecExchange = (BatchExecExchange) execNode3;
        if (!isConflictCausedByExchange(execNode2, batchExecExchange)) {
            batchExecExchange.setRequiredShuffleMode(this.shuffleMode);
            return;
        }
        BatchExecExchange copy = batchExecExchange.copy(batchExecExchange.getTraitSet(), batchExecExchange.getInput(), batchExecExchange.getDistribution());
        copy.setRequiredShuffleMode(this.shuffleMode);
        execNode.replaceInputNode(i2, copy);
    }

    private boolean isConflictCausedByExchange(ExecNode<?, ?> execNode, BatchExecExchange batchExecExchange) {
        ConflictCausedByExchangeChecker conflictCausedByExchangeChecker = new ConflictCausedByExchangeChecker(batchExecExchange);
        conflictCausedByExchangeChecker.visit(execNode);
        return conflictCausedByExchangeChecker.found;
    }

    private BatchExecExchange createExchange(ExecNode<?, ?> execNode, int i) {
        FlinkRelDistribution SINGLETON;
        RelNode relNode = (RelNode) execNode.getInputNodes().get(i);
        ExecEdge.RequiredShuffle requiredShuffle = execNode.getInputEdges().get(i).getRequiredShuffle();
        if (requiredShuffle.getType() == ExecEdge.ShuffleType.HASH) {
            SINGLETON = FlinkRelDistribution.hash(requiredShuffle.getKeys(), true);
        } else {
            if (requiredShuffle.getType() == ExecEdge.ShuffleType.BROADCAST) {
                throw new IllegalStateException("Trying to resolve input priority conflict on broadcast side. This is not expected.");
            }
            SINGLETON = requiredShuffle.getType() == ExecEdge.ShuffleType.SINGLETON ? FlinkRelDistribution.SINGLETON() : FlinkRelDistribution.ANY();
        }
        BatchExecExchange batchExecExchange = new BatchExecExchange(relNode.getCluster(), relNode.getTraitSet().replace(SINGLETON), relNode, SINGLETON);
        batchExecExchange.setRequiredShuffleMode(this.shuffleMode);
        return batchExecExchange;
    }
}
