package org.apache.iceberg.spark.actions;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.ImmutableRewriteManifests;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkDataFile;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.class */
public class RewriteManifestsSparkAction extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction> implements RewriteManifests {
    public static final String USE_CACHING = "use-caching";
    public static final boolean USE_CACHING_DEFAULT = true;
    private static final Logger LOG = LoggerFactory.getLogger(RewriteManifestsSparkAction.class);
    private final Encoder<ManifestFile> manifestEncoder;
    private final Table table;
    private final int formatVersion;
    private final long targetManifestSizeBytes;
    private PartitionSpec spec;
    private Predicate<ManifestFile> predicate;
    private String stagingLocation;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RewriteManifestsSparkAction(SparkSession sparkSession, Table table) {
        super(sparkSession);
        this.spec = null;
        this.predicate = manifestFile -> {
            return true;
        };
        this.stagingLocation = null;
        this.manifestEncoder = Encoders.javaSerialization(ManifestFile.class);
        this.table = table;
        this.spec = table.spec();
        this.targetManifestSizeBytes = PropertyUtil.propertyAsLong(table.properties(), TableProperties.MANIFEST_TARGET_SIZE_BYTES, 8388608L);
        TableOperations operations = ((HasTableOperations) table).operations();
        this.stagingLocation = new Path(operations.metadataFileLocation("file")).getParent().toString();
        this.formatVersion = operations.current().formatVersion();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iceberg.spark.actions.BaseSparkAction
    public RewriteManifestsSparkAction self() {
        return this;
    }

    @Override // org.apache.iceberg.actions.RewriteManifests
    public RewriteManifestsSparkAction specId(int i) {
        Preconditions.checkArgument(this.table.specs().containsKey(Integer.valueOf(i)), "Invalid spec id %s", i);
        this.spec = this.table.specs().get(Integer.valueOf(i));
        return this;
    }

    @Override // org.apache.iceberg.actions.RewriteManifests
    public RewriteManifestsSparkAction rewriteIf(Predicate<ManifestFile> predicate) {
        this.predicate = predicate;
        return this;
    }

    @Override // org.apache.iceberg.actions.RewriteManifests
    public RewriteManifestsSparkAction stagingLocation(String str) {
        this.stagingLocation = str;
        return this;
    }

    @Override // org.apache.iceberg.actions.Action
    public RewriteManifests.Result execute() {
        return (RewriteManifests.Result) withJobGroupInfo(newJobGroupInfo("REWRITE-MANIFESTS", String.format("Rewriting manifests (staging location=%s) of %s", this.stagingLocation, this.table.name())), this::doExecute);
    }

    private RewriteManifests.Result doExecute() {
        List<ManifestFile> findMatchingManifests = findMatchingManifests();
        if (findMatchingManifests.isEmpty()) {
            return ImmutableRewriteManifests.Result.builder().addedManifests(ImmutableList.of()).rewrittenManifests(ImmutableList.of()).build();
        }
        long j = 0;
        int i = 0;
        for (ManifestFile manifestFile : findMatchingManifests) {
            ValidationException.check(hasFileCounts(manifestFile), "No file counts in manifest: %s", manifestFile.path());
            j += manifestFile.length();
            i += manifestFile.addedFilesCount().intValue() + manifestFile.existingFilesCount().intValue() + manifestFile.deletedFilesCount().intValue();
        }
        int targetNumManifests = targetNumManifests(j);
        int targetNumManifestEntries = targetNumManifestEntries(i, targetNumManifests);
        if (targetNumManifests == 1 && findMatchingManifests.size() == 1) {
            return ImmutableRewriteManifests.Result.builder().addedManifests(ImmutableList.of()).rewrittenManifests(ImmutableList.of()).build();
        }
        Dataset<Row> buildManifestEntryDF = buildManifestEntryDF(findMatchingManifests);
        List<ManifestFile> writeManifestsForUnpartitionedTable = this.spec.fields().size() < 1 ? writeManifestsForUnpartitionedTable(buildManifestEntryDF, targetNumManifests) : writeManifestsForPartitionedTable(buildManifestEntryDF, targetNumManifests, targetNumManifestEntries);
        replaceManifests(findMatchingManifests, writeManifestsForUnpartitionedTable);
        return ImmutableRewriteManifests.Result.builder().addedManifests(writeManifestsForUnpartitionedTable).rewrittenManifests(findMatchingManifests).build();
    }

    private Dataset<Row> buildManifestEntryDF(List<ManifestFile> list) {
        Dataset df = spark().createDataset(Lists.transform(list, (v0) -> {
            return v0.path();
        }), Encoders.STRING()).toDF(new String[]{"manifest"});
        Dataset selectExpr = loadMetadataTable(this.table, MetadataTableType.ENTRIES).filter("status < 2").selectExpr(new String[]{"input_file_name() as manifest", "snapshot_id", "sequence_number", "file_sequence_number", "data_file"});
        return selectExpr.join(df, df.col("manifest").equalTo(selectExpr.col("manifest")), "left_semi").select("snapshot_id", new String[]{"sequence_number", "file_sequence_number", "data_file"});
    }

    private List<ManifestFile> writeManifestsForUnpartitionedTable(Dataset<Row> dataset, int i) {
        Broadcast broadcast = sparkContext().broadcast(SerializableTable.copyOf(this.table));
        StructType dataType = dataset.schema().apply("data_file").dataType();
        return dataset.repartition(i).mapPartitions(toManifests(broadcast, TableProperties.MAX_REF_AGE_MS_DEFAULT, this.stagingLocation, this.formatVersion, Partitioning.partitionType(this.table), this.spec, dataType), this.manifestEncoder).collectAsList();
    }

    private List<ManifestFile> writeManifestsForPartitionedTable(Dataset<Row> dataset, int i, int i2) {
        Broadcast broadcast = sparkContext().broadcast(SerializableTable.copyOf(this.table));
        StructType dataType = dataset.schema().apply("data_file").dataType();
        Types.StructType partitionType = Partitioning.partitionType(this.table);
        long j = (long) (1.1d * i2);
        return (List) withReusableDS(dataset, dataset2 -> {
            Column col = dataset2.col("data_file.partition");
            return dataset2.repartitionByRange(i, new Column[]{col}).sortWithinPartitions(new Column[]{col}).mapPartitions(toManifests(broadcast, j, this.stagingLocation, this.formatVersion, partitionType, this.spec, dataType), this.manifestEncoder).collectAsList();
        });
    }

    private <T, U> U withReusableDS(Dataset<T> dataset, Function<Dataset<T>, U> function) {
        boolean propertyAsBoolean = PropertyUtil.propertyAsBoolean(options(), USE_CACHING, true);
        Dataset<T> cache = propertyAsBoolean ? dataset.cache() : dataset.repartition(SQLConf.get().numShufflePartitions()).map(obj -> {
            return obj;
        }, dataset.exprEnc());
        try {
            U apply = function.apply(cache);
            if (propertyAsBoolean) {
                cache.unpersist(false);
            }
            return apply;
        } catch (Throwable th) {
            if (propertyAsBoolean) {
                cache.unpersist(false);
            }
            throw th;
        }
    }

    private List<ManifestFile> findMatchingManifests() {
        Snapshot currentSnapshot = this.table.currentSnapshot();
        return currentSnapshot == null ? ImmutableList.of() : (List) currentSnapshot.dataManifests(this.table.io()).stream().filter(manifestFile -> {
            return manifestFile.partitionSpecId() == this.spec.specId() && this.predicate.test(manifestFile);
        }).collect(Collectors.toList());
    }

    private int targetNumManifests(long j) {
        return (int) (((j + this.targetManifestSizeBytes) - 1) / this.targetManifestSizeBytes);
    }

    private int targetNumManifestEntries(int i, int i2) {
        return ((i + i2) - 1) / i2;
    }

    private boolean hasFileCounts(ManifestFile manifestFile) {
        return (manifestFile.addedFilesCount() == null || manifestFile.existingFilesCount() == null || manifestFile.deletedFilesCount() == null) ? false : true;
    }

    private void replaceManifests(Iterable<ManifestFile> iterable, Iterable<ManifestFile> iterable2) {
        try {
            boolean propertyAsBoolean = PropertyUtil.propertyAsBoolean(this.table.properties(), TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, false);
            org.apache.iceberg.RewriteManifests rewriteManifests = this.table.rewriteManifests();
            rewriteManifests.getClass();
            iterable.forEach(rewriteManifests::deleteManifest);
            rewriteManifests.getClass();
            iterable2.forEach(rewriteManifests::addManifest);
            commit(rewriteManifests);
            if (!propertyAsBoolean) {
                deleteFiles(Iterables.transform(iterable2, (v0) -> {
                    return v0.path();
                }));
            }
        } catch (CommitStateUnknownException e) {
            throw e;
        } catch (Exception e2) {
            deleteFiles(Iterables.transform(iterable2, (v0) -> {
                return v0.path();
            }));
            throw e2;
        }
    }

    private void deleteFiles(Iterable<String> iterable) {
        Tasks.foreach(iterable).executeWith(ThreadPools.getWorkerPool()).noRetry().suppressFailureWhenFinished().onFailure((str, exc) -> {
            LOG.warn("Failed to delete: {}", str, exc);
        }).run(str2 -> {
            this.table.io().deleteFile(str2);
        });
    }

    private static ManifestFile writeManifest(List<Row> list, int i, int i2, Broadcast<Table> broadcast, String str, int i3, Types.StructType structType, PartitionSpec partitionSpec, StructType structType2) throws IOException {
        OutputFile newOutputFile = ((Table) broadcast.value()).io().newOutputFile(FileFormat.AVRO.addExtension(new Path(str, "optimized-m-" + UUID.randomUUID()).toString()));
        SparkDataFile sparkDataFile = new SparkDataFile(DataFile.getType(structType), DataFile.getType(partitionSpec.partitionType()), structType2);
        ManifestWriter<DataFile> write = ManifestFiles.write(i3, partitionSpec, newOutputFile, null);
        for (int i4 = i; i4 < i2; i4++) {
            try {
                Row row = list.get(i4);
                write.existing(sparkDataFile.wrap(row.getStruct(3)), row.getLong(0), row.getLong(1), row.isNullAt(2) ? null : Long.valueOf(row.getLong(2)));
            } finally {
                write.close();
            }
        }
        return write.toManifestFile();
    }

    private static MapPartitionsFunction<Row, ManifestFile> toManifests(Broadcast<Table> broadcast, long j, String str, int i, Types.StructType structType, PartitionSpec partitionSpec, StructType structType2) {
        return it -> {
            ArrayList newArrayList = Lists.newArrayList(it);
            if (newArrayList.isEmpty()) {
                return Collections.emptyIterator();
            }
            ArrayList newArrayList2 = Lists.newArrayList();
            if (newArrayList.size() <= j) {
                newArrayList2.add(writeManifest(newArrayList, 0, newArrayList.size(), broadcast, str, i, structType, partitionSpec, structType2));
            } else {
                int size = newArrayList.size() / 2;
                newArrayList2.add(writeManifest(newArrayList, 0, size, broadcast, str, i, structType, partitionSpec, structType2));
                newArrayList2.add(writeManifest(newArrayList, size, newArrayList.size(), broadcast, str, i, structType, partitionSpec, structType2));
            }
            return newArrayList2.iterator();
        };
    }

    @Override // org.apache.iceberg.actions.RewriteManifests
    public /* bridge */ /* synthetic */ RewriteManifests rewriteIf(Predicate predicate) {
        return rewriteIf((Predicate<ManifestFile>) predicate);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1814862031:
                if (implMethodName.equals("lambda$toManifests$102ef28f$1")) {
                    z = false;
                    break;
                }
                break;
            case 47549558:
                if (implMethodName.equals("lambda$withReusableDS$66023773$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapPartitionsFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/util/Iterator;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/iceberg/spark/actions/RewriteManifestsSparkAction") && serializedLambda.getImplMethodSignature().equals("(JLorg/apache/spark/broadcast/Broadcast;Ljava/lang/String;ILorg/apache/iceberg/types/Types$StructType;Lorg/apache/iceberg/PartitionSpec;Lorg/apache/spark/sql/types/StructType;Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    long longValue = ((Long) serializedLambda.getCapturedArg(0)).longValue();
                    Broadcast broadcast = (Broadcast) serializedLambda.getCapturedArg(1);
                    String str = (String) serializedLambda.getCapturedArg(2);
                    int intValue = ((Integer) serializedLambda.getCapturedArg(3)).intValue();
                    Types.StructType structType = (Types.StructType) serializedLambda.getCapturedArg(4);
                    PartitionSpec partitionSpec = (PartitionSpec) serializedLambda.getCapturedArg(5);
                    StructType structType2 = (StructType) serializedLambda.getCapturedArg(6);
                    return it -> {
                        ArrayList newArrayList = Lists.newArrayList(it);
                        if (newArrayList.isEmpty()) {
                            return Collections.emptyIterator();
                        }
                        ArrayList newArrayList2 = Lists.newArrayList();
                        if (newArrayList.size() <= longValue) {
                            newArrayList2.add(writeManifest(newArrayList, 0, newArrayList.size(), broadcast, str, intValue, structType, partitionSpec, structType2));
                        } else {
                            int size = newArrayList.size() / 2;
                            newArrayList2.add(writeManifest(newArrayList, 0, size, broadcast, str, intValue, structType, partitionSpec, structType2));
                            newArrayList2.add(writeManifest(newArrayList, size, newArrayList.size(), broadcast, str, intValue, structType, partitionSpec, structType2));
                        }
                        return newArrayList2.iterator();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/iceberg/spark/actions/RewriteManifestsSparkAction") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    return obj -> {
                        return obj;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
