/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.actions;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.ImmutableRemoveDanglingDeleteFiles;
import org.apache.iceberg.actions.RemoveDanglingDeleteFiles;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.SparkDeleteFile;
import org.apache.iceberg.spark.actions.BaseSnapshotUpdateSparkAction;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RemoveDanglingDeletesSparkAction
extends BaseSnapshotUpdateSparkAction<RemoveDanglingDeletesSparkAction>
implements RemoveDanglingDeleteFiles {
    private static final Logger LOG = LoggerFactory.getLogger(RemoveDanglingDeletesSparkAction.class);
    private final Table table;

    protected RemoveDanglingDeletesSparkAction(SparkSession spark, Table table) {
        super(spark);
        this.table = table;
    }

    @Override
    protected RemoveDanglingDeletesSparkAction self() {
        return this;
    }

    public RemoveDanglingDeleteFiles.Result execute() {
        if (this.table.specs().size() == 1 && this.table.spec().isUnpartitioned()) {
            return ImmutableRemoveDanglingDeleteFiles.Result.builder().removedDeleteFiles(Collections.emptyList()).build();
        }
        String desc = String.format("Removing dangling delete files in %s", this.table.name());
        JobGroupInfo info = this.newJobGroupInfo("REMOVE-DELETES", desc);
        return this.withJobGroupInfo(info, this::doExecute);
    }

    RemoveDanglingDeleteFiles.Result doExecute() {
        RewriteFiles rewriteFiles = this.table.newRewrite();
        List<DeleteFile> danglingDeletes = this.findDanglingDeletes();
        for (DeleteFile deleteFile : danglingDeletes) {
            LOG.debug("Removing dangling delete file {}", (Object)deleteFile.path());
            rewriteFiles.deleteFile(deleteFile);
        }
        if (!danglingDeletes.isEmpty()) {
            this.commit((SnapshotUpdate<?>)rewriteFiles);
        }
        return ImmutableRemoveDanglingDeleteFiles.Result.builder().removedDeleteFiles(danglingDeletes).build();
    }

    private List<DeleteFile> findDanglingDeletes() {
        Dataset minSequenceNumberByPartition = this.loadMetadataTable(this.table, MetadataTableType.ENTRIES).filter("data_file.content == 0 AND status < 2").selectExpr(new String[]{"data_file.partition as partition", "data_file.spec_id as spec_id", "sequence_number"}).groupBy("partition", new String[]{"spec_id"}).agg(functions.min((String)"sequence_number"), new Column[0]).toDF(new String[]{"grouped_partition", "grouped_spec_id", "min_data_sequence_number"});
        Dataset deleteEntries = this.loadMetadataTable(this.table, MetadataTableType.ENTRIES).filter("data_file.content != 0 AND status < 2");
        Column joinOnPartition = deleteEntries.col("data_file.spec_id").equalTo((Object)minSequenceNumberByPartition.col("grouped_spec_id")).and(deleteEntries.col("data_file.partition").equalTo((Object)minSequenceNumberByPartition.col("grouped_partition")));
        Column filterOnDanglingDeletes = functions.col((String)"min_data_sequence_number").isNull().or(functions.col((String)"data_file.content").equalTo((Object)"1").and(functions.col((String)"sequence_number").$less((Object)functions.col((String)"min_data_sequence_number")))).or(functions.col((String)"data_file.content").equalTo((Object)"2").and(functions.col((String)"sequence_number").$less$eq((Object)functions.col((String)"min_data_sequence_number"))));
        Dataset danglingDeletes = deleteEntries.join(minSequenceNumberByPartition, joinOnPartition, "left").filter(filterOnDanglingDeletes).select("data_file.*", new String[0]);
        return danglingDeletes.collectAsList().stream().map(row -> this.deleteFileWrapper(danglingDeletes.schema(), (Row)row)).collect(Collectors.toList());
    }

    private DeleteFile deleteFileWrapper(StructType sparkFileType, Row row) {
        int specId = row.getInt(row.fieldIndex("spec_id"));
        Types.StructType combinedFileType = DataFile.getType((Types.StructType)Partitioning.partitionType((Table)this.table));
        Types.StructType projection = DataFile.getType((Types.StructType)((PartitionSpec)this.table.specs().get(specId)).partitionType());
        return (DeleteFile)new SparkDeleteFile(combinedFileType, projection, sparkFileType).wrap(row);
    }
}

