/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.clean;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hudi.table.action.clean.PartitionCleanStat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CleanActionExecutor<T, I, K, O>
extends BaseActionExecutor<T, I, K, O, HoodieCleanMetadata> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(CleanActionExecutor.class);
    private final TransactionManager txnManager;
    private final boolean skipLocking;

    public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
        this(context, config, table, instantTime, false);
    }

    public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime, boolean skipLocking) {
        super(context, config, table, instantTime);
        this.txnManager = new TransactionManager(config, table.getStorage());
        this.skipLocking = skipLocking;
    }

    private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {
        Path deletePath = new Path(deletePathStr);
        LOG.debug("Working on delete path :" + deletePath);
        try {
            boolean isDirectory = fs.isDirectory(deletePath);
            boolean deleteResult = fs.delete(deletePath, isDirectory);
            if (deleteResult) {
                LOG.debug("Cleaned file at path :" + deletePath);
            } else {
                if (fs.exists(deletePath)) {
                    throw new HoodieIOException("Failed to delete path during clean execution " + deletePath);
                }
                LOG.debug("Already cleaned up file at path :" + deletePath);
            }
            return deleteResult;
        }
        catch (FileNotFoundException fio) {
            return false;
        }
    }

    private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator<Pair<String, CleanFileInfo>> cleanFileInfo, HoodieTable table) {
        HashMap partitionCleanStatMap = new HashMap();
        FileSystem fs = (FileSystem)table.getStorage().getFileSystem();
        cleanFileInfo.forEachRemaining(partitionDelFileTuple -> {
            String partitionPath = (String)partitionDelFileTuple.getLeft();
            Path deletePath = new Path(((CleanFileInfo)partitionDelFileTuple.getRight()).getFilePath());
            String deletePathStr = deletePath.toString();
            boolean deletedFileResult = false;
            try {
                deletedFileResult = CleanActionExecutor.deleteFileAndGetResult(fs, deletePathStr);
            }
            catch (IOException e) {
                LOG.error("Delete file failed: " + deletePathStr, (Throwable)e);
            }
            PartitionCleanStat partitionCleanStat = partitionCleanStatMap.computeIfAbsent(partitionPath, k -> new PartitionCleanStat(partitionPath));
            boolean isBootstrapBasePathFile = ((CleanFileInfo)partitionDelFileTuple.getRight()).isBootstrapBaseFile();
            if (isBootstrapBasePathFile) {
                partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
                partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true);
            } else {
                partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
                partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false);
            }
        });
        return partitionCleanStatMap.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue()));
    }

    List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {
        int cleanerParallelism = Math.min(cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum(), this.config.getCleanerParallelism());
        LOG.info("Using cleanerParallelism: " + cleanerParallelism);
        context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of table: " + this.config.getTableName());
        Stream filesToBeDeletedPerPartition = cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream().flatMap(x -> ((List)x.getValue()).stream().map(y -> new ImmutablePair(x.getKey(), (Object)new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile().booleanValue()))));
        Stream partitionCleanStats = context.mapPartitionsToPairAndReduceByKey(filesToBeDeletedPerPartition, (SerializablePairFlatMapFunction & Serializable)iterator -> CleanActionExecutor.deleteFilesFunc(iterator, this.table), PartitionCleanStat::merge, cleanerParallelism);
        Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        ArrayList partitionsToBeDeleted = this.table.getMetaClient().getTableConfig().isTablePartitioned() && cleanerPlan.getPartitionsToBeDeleted() != null ? cleanerPlan.getPartitionsToBeDeleted() : new ArrayList();
        partitionsToBeDeleted.forEach(entry -> {
            try {
                if (!StringUtils.isNullOrEmpty((String)entry)) {
                    CleanActionExecutor.deleteFileAndGetResult((FileSystem)this.table.getStorage().getFileSystem(), this.table.getMetaClient().getBasePath() + "/" + entry);
                }
            }
            catch (IOException e) {
                LOG.warn("Partition deletion failed " + entry);
            }
        });
        return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
            PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath) ? (PartitionCleanStat)partitionCleanStatsMap.get(partitionPath) : new PartitionCleanStat((String)partitionPath);
            HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
            return HoodieCleanStat.newBuilder().withPolicy(this.config.getCleanerPolicy()).withPartitionPath(partitionPath).withEarliestCommitRetained(Option.ofNullable((Object)(actionInstant != null ? new HoodieInstant(HoodieInstant.State.valueOf((String)actionInstant.getState()), actionInstant.getAction(), actionInstant.getTimestamp()) : null))).withLastCompletedCommitTimestamp(cleanerPlan.getLastCompletedCommitTimestamp()).withDeletePathPattern(partitionCleanStat.deletePathPatterns()).withSuccessfulDeletes(partitionCleanStat.successDeleteFiles()).withFailedDeletes(partitionCleanStat.failedDeleteFiles()).withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns()).withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles()).withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles()).isPartitionDeleted(partitionsToBeDeleted.contains(partitionPath)).build();
        }).collect(Collectors.toList());
    }

    HoodieCleanMetadata runPendingClean(HoodieTable<T, I, K, O> table, HoodieInstant cleanInstant) {
        try {
            HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan((HoodieTableMetaClient)table.getMetaClient(), (HoodieInstant)cleanInstant);
            return this.runClean(table, cleanInstant, cleanerPlan);
        }
        catch (IOException e) {
            throw new HoodieIOException(e.getMessage(), e);
        }
    }

    private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) {
        ValidationUtils.checkArgument((cleanInstant.getState().equals((Object)HoodieInstant.State.REQUESTED) || cleanInstant.getState().equals((Object)HoodieInstant.State.INFLIGHT) ? 1 : 0) != 0);
        HoodieInstant inflightInstant = null;
        try {
            HoodieTimer timer = HoodieTimer.start();
            inflightInstant = cleanInstant.isRequested() ? table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan((HoodieCleanerPlan)cleanerPlan)) : cleanInstant;
            List<HoodieCleanStat> cleanStats = this.clean(this.context, cleanerPlan);
            if (cleanStats.isEmpty()) {
                HoodieCleanMetadata hoodieCleanMetadata = HoodieCleanMetadata.newBuilder().build();
                return hoodieCleanMetadata;
            }
            table.getMetaClient().reloadActiveTimeline();
            HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata((String)inflightInstant.getTimestamp(), (Option)Option.of((Object)timer.endTimer()), cleanStats, (Map)cleanerPlan.getExtraMetadata());
            if (!this.skipLocking) {
                this.txnManager.beginTransaction((Option<HoodieInstant>)Option.of((Object)inflightInstant), (Option<HoodieInstant>)Option.empty());
            }
            this.writeTableMetadata(metadata, inflightInstant.getTimestamp());
            table.getActiveTimeline().transitionCleanInflightToComplete(false, inflightInstant, TimelineMetadataUtils.serializeCleanMetadata((HoodieCleanMetadata)metadata));
            LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete");
            HoodieCleanMetadata hoodieCleanMetadata = metadata;
            return hoodieCleanMetadata;
        }
        catch (IOException e) {
            throw new HoodieIOException("Failed to clean up after commit", e);
        }
        finally {
            if (!this.skipLocking) {
                this.txnManager.endTransaction((Option<HoodieInstant>)Option.ofNullable((Object)inflightInstant));
            }
        }
    }

    @Override
    public HoodieCleanMetadata execute() {
        ArrayList<HoodieCleanMetadata> cleanMetadataList = new ArrayList<HoodieCleanMetadata>();
        List pendingCleanInstants = this.table.getCleanTimeline().filterInflightsAndRequested().getInstants();
        if (pendingCleanInstants.size() > 0) {
            try {
                FileBasedInternalSchemaStorageManager fss = new FileBasedInternalSchemaStorageManager(this.table.getMetaClient());
                fss.cleanOldFiles(pendingCleanInstants.stream().map(is -> is.getTimestamp()).collect(Collectors.toList()));
            }
            catch (Exception e) {
                LOG.warn("failed to clean old history schema");
            }
            for (HoodieInstant hoodieInstant : pendingCleanInstants) {
                if (this.table.getCleanTimeline().isEmpty(hoodieInstant)) {
                    this.table.getActiveTimeline().deleteEmptyInstantIfExists(hoodieInstant);
                } else {
                    LOG.info("Finishing previously unfinished cleaner instant=" + hoodieInstant);
                    try {
                        cleanMetadataList.add(this.runPendingClean(this.table, hoodieInstant));
                    }
                    catch (HoodieIOException e) {
                        this.checkIfOtherWriterCommitted(hoodieInstant, e);
                    }
                    catch (Exception e) {
                        LOG.error("Failed to perform previous clean operation, instant: " + hoodieInstant, (Throwable)e);
                        throw e;
                    }
                }
                this.table.getMetaClient().reloadActiveTimeline();
                if (!this.table.getMetaClient().getTableConfig().isMetadataTableAvailable()) continue;
                this.table.getHoodieView().sync();
            }
        }
        return cleanMetadataList.size() > 0 ? (HoodieCleanMetadata)cleanMetadataList.get(cleanMetadataList.size() - 1) : null;
    }

    private void checkIfOtherWriterCommitted(HoodieInstant hoodieInstant, HoodieIOException e) {
        this.table.getMetaClient().reloadActiveTimeline();
        if (!this.table.getCleanTimeline().filterCompletedInstants().containsInstant(hoodieInstant.getTimestamp())) {
            LOG.error("Failed to perform previous clean operation, instant: " + hoodieInstant, (Throwable)e);
            throw e;
        }
        LOG.warn("Clean operation was completed by another writer for instant: " + hoodieInstant);
    }
}

