package org.apache.flink.table.planner.plan.nodes.exec.batch;

import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardForConsecutiveHashPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.HashCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecExchange;
import org.apache.flink.table.planner.utils.StreamExchangeModeUtils;
import org.apache.flink.table.runtime.partitioner.BinaryHashPartitioner;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:flink-table-store-codegen.jar:org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.class */
public class BatchExecExchange extends CommonExecExchange implements BatchExecNode<RowData> {

    @Nullable
    private StreamExchangeMode requiredExchangeMode;

    public BatchExecExchange(ReadableConfig readableConfig, InputProperty inputProperty, RowType rowType, String str) {
        super(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(BatchExecExchange.class), ExecNodeContext.newPersistedConfig(BatchExecExchange.class, readableConfig), Collections.singletonList(inputProperty), rowType, str);
    }

    public void setRequiredExchangeMode(@Nullable StreamExchangeMode streamExchangeMode) {
        this.requiredExchangeMode = streamExchangeMode;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase, org.apache.flink.table.planner.plan.nodes.exec.ExecNode
    public String getDescription() {
        InputProperty.RequiredDistribution requiredDistribution = getInputProperties().get(0).getRequiredDistribution();
        StringBuilder sb = new StringBuilder();
        String lowerCase = requiredDistribution.getType().name().toLowerCase();
        if (lowerCase.equals("singleton")) {
            lowerCase = "single";
        } else if ((requiredDistribution instanceof InputProperty.KeepInputAsIsDistribution) && ((InputProperty.KeepInputAsIsDistribution) requiredDistribution).isStrict()) {
            lowerCase = "forward";
        }
        sb.append("distribution=[").append(lowerCase);
        if (requiredDistribution instanceof InputProperty.HashDistribution) {
            sb.append(getHashDistributionDescription((InputProperty.HashDistribution) requiredDistribution));
        } else if ((requiredDistribution instanceof InputProperty.KeepInputAsIsDistribution) && !((InputProperty.KeepInputAsIsDistribution) requiredDistribution).isStrict()) {
            sb.append("[hash").append(getHashDistributionDescription((InputProperty.HashDistribution) ((InputProperty.KeepInputAsIsDistribution) requiredDistribution).getInputDistribution())).append("]");
        }
        sb.append("]");
        if (this.requiredExchangeMode == StreamExchangeMode.BATCH) {
            sb.append(", shuffle_mode=[BATCH]");
        }
        return String.format("Exchange(%s)", sb);
    }

    private String getHashDistributionDescription(InputProperty.HashDistribution hashDistribution) {
        RowType outputType = getInputEdges().get(0).getOutputType();
        return (String) Arrays.stream((String[]) Arrays.stream(hashDistribution.getKeys()).mapToObj(i -> {
            return (String) outputType.getFieldNames().get(i);
        }).toArray(i2 -> {
            return new String[i2];
        })).collect(Collectors.joining(", ", "[", "]"));
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        StreamPartitioner forwardForConsecutiveHashPartitioner;
        int parallelism;
        ExecEdge execEdge = getInputEdges().get(0);
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        RowType rowType = (RowType) execEdge.getOutputType();
        boolean z = false;
        InputProperty.RequiredDistribution requiredDistribution = getInputProperties().get(0).getRequiredDistribution();
        InputProperty.DistributionType type = requiredDistribution.getType();
        switch (type) {
            case ANY:
                forwardForConsecutiveHashPartitioner = null;
                parallelism = -1;
                break;
            case BROADCAST:
                forwardForConsecutiveHashPartitioner = new BroadcastPartitioner();
                parallelism = -1;
                break;
            case SINGLETON:
                forwardForConsecutiveHashPartitioner = new GlobalPartitioner();
                parallelism = 1;
                break;
            case HASH:
                forwardForConsecutiveHashPartitioner = createHashPartitioner((InputProperty.HashDistribution) requiredDistribution, rowType, execNodeConfig);
                parallelism = -1;
                break;
            case KEEP_INPUT_AS_IS:
                if (((InputProperty.KeepInputAsIsDistribution) requiredDistribution).isStrict()) {
                    forwardForConsecutiveHashPartitioner = new ForwardPartitioner();
                    z = true;
                } else {
                    InputProperty.RequiredDistribution inputDistribution = ((InputProperty.KeepInputAsIsDistribution) requiredDistribution).getInputDistribution();
                    Preconditions.checkArgument(inputDistribution instanceof InputProperty.HashDistribution, "Only HashDistribution is supported now");
                    forwardForConsecutiveHashPartitioner = new ForwardForConsecutiveHashPartitioner(createHashPartitioner((InputProperty.HashDistribution) inputDistribution, rowType, execNodeConfig));
                }
                parallelism = translateToPlan.getParallelism();
                break;
            default:
                throw new TableException(type + "is not supported now!");
        }
        PartitionTransformation partitionTransformation = new PartitionTransformation(translateToPlan, forwardForConsecutiveHashPartitioner, z ? StreamExchangeMode.UNDEFINED : StreamExchangeModeUtils.getBatchStreamExchangeMode(execNodeConfig, this.requiredExchangeMode));
        partitionTransformation.setParallelism(parallelism);
        partitionTransformation.setOutputType(InternalTypeInfo.of(getOutputType()));
        return partitionTransformation;
    }

    private BinaryHashPartitioner createHashPartitioner(InputProperty.HashDistribution hashDistribution, RowType rowType, ExecNodeConfig execNodeConfig) {
        int[] keys = hashDistribution.getKeys();
        return new BinaryHashPartitioner(HashCodeGenerator.generateRowHash(new CodeGeneratorContext(execNodeConfig.getTableConfig()), rowType, "HashPartitioner", keys), (String[]) Arrays.stream(keys).mapToObj(i -> {
            return (String) rowType.getFieldNames().get(i);
        }).toArray(i2 -> {
            return new String[i2];
        }));
    }

    @VisibleForTesting
    public Optional<StreamExchangeMode> getRequiredExchangeMode() {
        return Optional.ofNullable(this.requiredExchangeMode);
    }
}
