package org.apache.flink.table.planner.plan.nodes.exec.batch;

import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.ShuffleMode;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.HashCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecExchange;
import org.apache.flink.table.runtime.partitioner.BinaryHashPartitioner;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;

/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.class */
public class BatchExecExchange extends CommonExecExchange implements BatchExecNode<RowData> {

    @Nullable
    private ShuffleMode requiredShuffleMode;

    public BatchExecExchange(InputProperty inputProperty, RowType rowType, String str) {
        super(getNewNodeId(), Collections.singletonList(inputProperty), rowType, str);
    }

    public void setRequiredShuffleMode(@Nullable ShuffleMode shuffleMode) {
        this.requiredShuffleMode = shuffleMode;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase, org.apache.flink.table.planner.plan.nodes.exec.ExecNode
    public String getDescription() {
        InputProperty.RequiredDistribution requiredDistribution = getInputProperties().get(0).getRequiredDistribution();
        StringBuilder sb = new StringBuilder();
        String lowerCase = requiredDistribution.getType().name().toLowerCase();
        if (lowerCase.equals("singleton")) {
            lowerCase = "single";
        }
        sb.append("distribution=[").append(lowerCase);
        if (requiredDistribution.getType() == InputProperty.DistributionType.HASH) {
            RowType rowType = (RowType) getInputEdges().get(0).getOutputType();
            sb.append("[").append(String.join(", ", (String[]) Arrays.stream(((InputProperty.HashDistribution) requiredDistribution).getKeys()).mapToObj(i -> {
                return rowType.getFieldNames().get(i);
            }).toArray(i2 -> {
                return new String[i2];
            }))).append("]");
        }
        sb.append("]");
        if (this.requiredShuffleMode == ShuffleMode.BATCH) {
            sb.append(", shuffle_mode=[BATCH]");
        }
        return String.format("Exchange(%s)", sb.toString());
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase) {
        StreamPartitioner binaryHashPartitioner;
        int i;
        ExecEdge execEdge = getInputEdges().get(0);
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        InputProperty inputProperty = getInputProperties().get(0);
        InputProperty.DistributionType type = inputProperty.getRequiredDistribution().getType();
        switch (type) {
            case ANY:
                binaryHashPartitioner = null;
                i = -1;
                break;
            case BROADCAST:
                binaryHashPartitioner = new BroadcastPartitioner();
                i = -1;
                break;
            case SINGLETON:
                binaryHashPartitioner = new GlobalPartitioner();
                i = 1;
                break;
            case HASH:
                int[] keys = ((InputProperty.HashDistribution) inputProperty.getRequiredDistribution()).getKeys();
                RowType rowType = (RowType) execEdge.getOutputType();
                binaryHashPartitioner = new BinaryHashPartitioner(HashCodeGenerator.generateRowHash(new CodeGeneratorContext(plannerBase.getTableConfig()), execEdge.getOutputType(), "HashPartitioner", keys), (String[]) Arrays.stream(keys).mapToObj(i2 -> {
                    return rowType.getFieldNames().get(i2);
                }).toArray(i3 -> {
                    return new String[i3];
                }));
                i = -1;
                break;
            default:
                throw new TableException(type + "is not supported now!");
        }
        PartitionTransformation partitionTransformation = new PartitionTransformation(translateToPlan, binaryHashPartitioner, getShuffleMode(plannerBase.getTableConfig().getConfiguration(), this.requiredShuffleMode));
        partitionTransformation.setParallelism(i);
        partitionTransformation.setOutputType(InternalTypeInfo.of(getOutputType()));
        return partitionTransformation;
    }

    public static ShuffleMode getShuffleMode(Configuration configuration, @Nullable ShuffleMode shuffleMode) {
        if (shuffleMode != ShuffleMode.BATCH && !configuration.getString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE).equalsIgnoreCase(GlobalDataExchangeMode.ALL_EDGES_BLOCKING.toString())) {
            return ShuffleMode.UNDEFINED;
        }
        return ShuffleMode.BATCH;
    }

    @VisibleForTesting
    public Optional<ShuffleMode> getRequiredShuffleMode() {
        return Optional.ofNullable(this.requiredShuffleMode);
    }
}
