/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.actions;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkFunctionCatalog;
import org.apache.iceberg.spark.actions.SparkSizeBasedDataRewriter;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SortOrderUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce;
import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalescer;
import org.apache.spark.sql.connector.distributions.Distribution;
import org.apache.spark.sql.connector.distributions.Distributions;
import org.apache.spark.sql.connector.distributions.OrderedDistribution;
import org.apache.spark.sql.connector.write.RequiresDistributionAndOrdering;
import org.apache.spark.sql.connector.write.Write;
import org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$;
import scala.Option;

abstract class SparkShufflingDataRewriter
extends SparkSizeBasedDataRewriter {
    public static final String COMPRESSION_FACTOR = "compression-factor";
    public static final double COMPRESSION_FACTOR_DEFAULT = 1.0;
    public static final String SHUFFLE_PARTITIONS_PER_FILE = "shuffle-partitions-per-file";
    public static final int SHUFFLE_PARTITIONS_PER_FILE_DEFAULT = 1;
    private double compressionFactor;
    private int numShufflePartitionsPerFile;

    protected SparkShufflingDataRewriter(SparkSession spark, Table table) {
        super(spark, table);
    }

    protected abstract SortOrder sortOrder();

    protected Schema sortSchema() {
        return this.table().schema();
    }

    protected abstract Dataset<Row> sortedDF(Dataset<Row> var1, Function<Dataset<Row>, Dataset<Row>> var2);

    public Set<String> validOptions() {
        return ImmutableSet.builder().addAll((Iterable)super.validOptions()).add((Object)COMPRESSION_FACTOR).add((Object)SHUFFLE_PARTITIONS_PER_FILE).build();
    }

    public void init(Map<String, String> options) {
        super.init(options);
        this.compressionFactor = this.compressionFactor(options);
        this.numShufflePartitionsPerFile = this.numShufflePartitionsPerFile(options);
    }

    @Override
    public void doRewrite(String groupId, List<FileScanTask> group) {
        Dataset scanDF = this.spark().read().format("iceberg").option("scan-task-set-id", groupId).load(groupId);
        Dataset<Row> sortedDF = this.sortedDF((Dataset<Row>)scanDF, this.sortFunction(group));
        sortedDF.write().format("iceberg").option("rewritten-file-scan-task-set-id", groupId).option("target-file-size-bytes", this.writeMaxFileSize()).option("use-table-distribution-and-ordering", "false").option("output-spec-id", (long)this.outputSpecId()).mode("append").save(groupId);
    }

    private Function<Dataset<Row>, Dataset<Row>> sortFunction(List<FileScanTask> group) {
        org.apache.spark.sql.connector.expressions.SortOrder[] ordering = Spark3Util.toOrdering(this.outputSortOrder(group));
        int numShufflePartitions = this.numShufflePartitions(group);
        return df -> this.transformPlan((Dataset<Row>)df, plan -> this.sortPlan((LogicalPlan)plan, ordering, numShufflePartitions));
    }

    private LogicalPlan sortPlan(LogicalPlan plan, org.apache.spark.sql.connector.expressions.SortOrder[] ordering, int numShufflePartitions) {
        SparkFunctionCatalog catalog = SparkFunctionCatalog.get();
        OrderedWrite write = new OrderedWrite(ordering, numShufflePartitions);
        LogicalPlan sortPlan = DistributionAndOrderingUtils$.MODULE$.prepareQuery((Write)write, plan, Option.apply((Object)catalog));
        if (this.numShufflePartitionsPerFile == 1) {
            return sortPlan;
        }
        OrderAwareCoalescer coalescer = new OrderAwareCoalescer(this.numShufflePartitionsPerFile);
        int numOutputPartitions = numShufflePartitions / this.numShufflePartitionsPerFile;
        return new OrderAwareCoalesce(numOutputPartitions, coalescer, sortPlan);
    }

    private Dataset<Row> transformPlan(Dataset<Row> df, Function<LogicalPlan, LogicalPlan> func) {
        return new Dataset(this.spark(), func.apply(df.logicalPlan()), df.encoder());
    }

    private SortOrder outputSortOrder(List<FileScanTask> group) {
        boolean requiresRepartitioning;
        PartitionSpec spec = this.outputSpec();
        boolean bl = requiresRepartitioning = !group.get(0).spec().equals((Object)spec);
        if (requiresRepartitioning) {
            return SortOrderUtil.buildSortOrder((Schema)this.sortSchema(), (PartitionSpec)spec, (SortOrder)this.sortOrder());
        }
        return this.sortOrder();
    }

    private int numShufflePartitions(List<FileScanTask> group) {
        int numOutputFiles = (int)this.numOutputFiles((long)((double)this.inputSize(group) * this.compressionFactor));
        return Math.max(1, numOutputFiles * this.numShufflePartitionsPerFile);
    }

    private double compressionFactor(Map<String, String> options) {
        double value = PropertyUtil.propertyAsDouble(options, (String)COMPRESSION_FACTOR, (double)1.0);
        Preconditions.checkArgument((value > 0.0 ? 1 : 0) != 0, (String)"'%s' is set to %s but must be > 0", (Object)COMPRESSION_FACTOR, (Object)value);
        return value;
    }

    private int numShufflePartitionsPerFile(Map<String, String> options) {
        int value = PropertyUtil.propertyAsInt(options, (String)SHUFFLE_PARTITIONS_PER_FILE, (int)1);
        Preconditions.checkArgument((value > 0 ? 1 : 0) != 0, (String)"'%s' is set to %s but must be > 0", (Object)SHUFFLE_PARTITIONS_PER_FILE, (int)value);
        Preconditions.checkArgument((value == 1 || Spark3Util.extensionsEnabled(this.spark()) ? 1 : 0) != 0, (String)"Using '%s' requires enabling Iceberg Spark session extensions", (Object)SHUFFLE_PARTITIONS_PER_FILE);
        return value;
    }

    private static class OrderedWrite
    implements RequiresDistributionAndOrdering {
        private final OrderedDistribution distribution;
        private final org.apache.spark.sql.connector.expressions.SortOrder[] ordering;
        private final int numShufflePartitions;

        OrderedWrite(org.apache.spark.sql.connector.expressions.SortOrder[] ordering, int numShufflePartitions) {
            this.distribution = Distributions.ordered((org.apache.spark.sql.connector.expressions.SortOrder[])ordering);
            this.ordering = ordering;
            this.numShufflePartitions = numShufflePartitions;
        }

        public Distribution requiredDistribution() {
            return this.distribution;
        }

        public boolean distributionStrictlyRequired() {
            return true;
        }

        public int requiredNumPartitions() {
            return this.numShufflePartitions;
        }

        public org.apache.spark.sql.connector.expressions.SortOrder[] requiredOrdering() {
            return this.ordering;
        }
    }
}

