/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import com.codahale.metrics.Timer;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.client.BaseHoodieClient;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieTableServiceManagerClient;
import org.apache.hudi.client.HoodieTimelineArchiver;
import org.apache.hudi.client.RunsTableService;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public abstract class BaseHoodieTableServiceClient<O>
extends BaseHoodieClient
implements RunsTableService {
    private static final Logger LOG = LogManager.getLogger(BaseHoodieWriteClient.class);
    protected transient Timer.Context compactionTimer;
    protected transient Timer.Context clusteringTimer;
    protected transient Timer.Context logCompactionTimer;
    protected transient AsyncCleanerService asyncCleanerService;
    protected transient AsyncArchiveService asyncArchiveService;
    protected Set<String> pendingInflightAndRequestedInstants;

    protected BaseHoodieTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig, Option<EmbeddedTimelineService> timelineService) {
        super(context, clientConfig, timelineService);
    }

    protected void startAsyncCleanerService(BaseHoodieWriteClient writeClient) {
        if (this.asyncCleanerService == null) {
            this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(writeClient);
        } else {
            this.asyncCleanerService.start(null);
        }
    }

    protected void startAsyncArchiveService(BaseHoodieWriteClient writeClient) {
        if (this.asyncArchiveService == null) {
            this.asyncArchiveService = AsyncArchiveService.startAsyncArchiveIfEnabled(writeClient);
        } else {
            this.asyncArchiveService.start(null);
        }
    }

    protected void asyncClean() {
        AsyncCleanerService.waitForCompletion(this.asyncCleanerService);
    }

    protected void asyncArchive() {
        AsyncArchiveService.waitForCompletion(this.asyncArchiveService);
    }

    protected void setTableServiceTimer(WriteOperationType operationType) {
        switch (operationType) {
            case CLUSTER: {
                this.clusteringTimer = this.metrics.getClusteringCtx();
                break;
            }
            case COMPACT: {
                this.compactionTimer = this.metrics.getCompactionCtx();
                break;
            }
            case LOG_COMPACT: {
                this.logCompactionTimer = this.metrics.getLogCompactionCtx();
                break;
            }
        }
    }

    protected void setPendingInflightAndRequestedInstants(Set<String> pendingInflightAndRequestedInstants) {
        this.pendingInflightAndRequestedInstants = pendingInflightAndRequestedInstants;
    }

    protected void preCommit(HoodieCommitMetadata metadata) {
    }

    protected Option<String> inlineCompaction(Option<Map<String, String>> extraMetadata) {
        Option<String> compactionInstantTimeOpt = this.inlineScheduleCompaction(extraMetadata);
        compactionInstantTimeOpt.ifPresent(compactInstantTime -> this.compact((String)compactInstantTime, true));
        return compactionInstantTimeOpt;
    }

    private void inlineCompaction(HoodieTable table, Option<Map<String, String>> extraMetadata) {
        if (this.shouldDelegateToTableServiceManager(this.config, ActionType.compaction)) {
            this.scheduleCompaction(extraMetadata);
        } else {
            this.runAnyPendingCompactions(table);
            this.inlineCompaction(extraMetadata);
        }
    }

    protected HoodieWriteMetadata<O> logCompact(String compactionInstantTime, boolean shouldComplete) {
        throw new UnsupportedOperationException("Log compaction is not supported yet.");
    }

    protected Option<String> inlineLogCompact(Option<Map<String, String>> extraMetadata) {
        Option<String> logCompactionInstantTimeOpt = this.scheduleLogCompaction(extraMetadata);
        logCompactionInstantTimeOpt.ifPresent(logCompactInstantTime -> this.logCompact((String)logCompactInstantTime, true));
        return logCompactionInstantTimeOpt;
    }

    protected void runAnyPendingCompactions(HoodieTable table) {
        table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants().forEach(instant -> {
            LOG.info((Object)("Running previously failed inflight compaction at instant " + instant));
            this.compact(instant.getTimestamp(), true);
        });
    }

    protected void runAnyPendingLogCompactions(HoodieTable table) {
        table.getActiveTimeline().getWriteTimeline().filterPendingLogCompactionTimeline().getInstantsAsStream().forEach(instant -> {
            LOG.info((Object)("Running previously failed inflight log compaction at instant " + instant));
            this.logCompact(instant.getTimestamp(), true);
        });
    }

    protected Option<String> inlineScheduleCompaction(Option<Map<String, String>> extraMetadata) {
        return this.scheduleCompaction(extraMetadata);
    }

    public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
        String instantTime = HoodieActiveTimeline.createNewInstantTime();
        return this.scheduleCompactionAtInstant(instantTime, extraMetadata) ? Option.of((Object)instantTime) : Option.empty();
    }

    protected abstract HoodieWriteMetadata<O> compact(String var1, boolean var2);

    public abstract void commitCompaction(String var1, HoodieCommitMetadata var2, Option<Map<String, String>> var3);

    protected abstract void completeCompaction(HoodieCommitMetadata var1, HoodieTable var2, String var3);

    public Option<String> scheduleLogCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
        String instantTime = HoodieActiveTimeline.createNewInstantTime();
        return this.scheduleLogCompactionAtInstant(instantTime, extraMetadata) ? Option.of((Object)instantTime) : Option.empty();
    }

    public boolean scheduleLogCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
        return this.scheduleTableService(instantTime, extraMetadata, TableServiceType.LOG_COMPACT).isPresent();
    }

    public HoodieWriteMetadata<O> logCompact(String logCompactionInstantTime) {
        return this.logCompact(logCompactionInstantTime, this.config.shouldAutoCommit());
    }

    protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable table, String logCompactionCommitTime) {
        throw new UnsupportedOperationException("Log compaction is not supported yet.");
    }

    public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
        return this.scheduleTableService(instantTime, extraMetadata, TableServiceType.COMPACT).isPresent();
    }

    public Option<String> scheduleClustering(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
        String instantTime = HoodieActiveTimeline.createNewInstantTime();
        return this.scheduleClusteringAtInstant(instantTime, extraMetadata) ? Option.of((Object)instantTime) : Option.empty();
    }

    public boolean scheduleClusteringAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
        return this.scheduleTableService(instantTime, extraMetadata, TableServiceType.CLUSTER).isPresent();
    }

    public abstract HoodieWriteMetadata<O> cluster(String var1, boolean var2);

    protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
        if (!this.tableServicesEnabled(this.config)) {
            return;
        }
        if (!this.config.areAnyTableServicesExecutedInline().booleanValue() && !this.config.areAnyTableServicesScheduledInline().booleanValue()) {
            return;
        }
        if (this.config.isMetadataTableEnabled()) {
            table.getHoodieView().sync();
        }
        if (this.config.inlineCompactionEnabled()) {
            metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
            this.inlineCompaction(table, extraMetadata);
        } else {
            metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
        }
        if (!this.config.inlineCompactionEnabled() && this.config.scheduleInlineCompaction() && table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().empty()) {
            metadata.addMetadata(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), "true");
            this.inlineScheduleCompaction(extraMetadata);
        }
        if (this.config.inlineLogCompactionEnabled()) {
            this.runAnyPendingLogCompactions(table);
            metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "true");
            this.inlineLogCompact(extraMetadata);
        } else {
            metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "false");
        }
        if (this.config.inlineClusteringEnabled()) {
            metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
            this.inlineClustering(table, extraMetadata);
        } else {
            metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "false");
        }
        if (!this.config.inlineClusteringEnabled() && this.config.scheduleInlineClustering() && table.getActiveTimeline().filterPendingReplaceTimeline().empty()) {
            metadata.addMetadata(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), "true");
            this.inlineScheduleClustering(extraMetadata);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Option<String> scheduleTableService(String instantTime, Option<Map<String, String>> extraMetadata, TableServiceType tableServiceType) {
        Option inflightInstant = Option.of((Object)new HoodieInstant(HoodieInstant.State.REQUESTED, tableServiceType.getAction(), instantTime));
        try {
            this.txnManager.beginTransaction((Option<HoodieInstant>)inflightInstant, (Option<HoodieInstant>)Option.empty());
            LOG.info((Object)("Scheduling table service " + tableServiceType));
            Option<String> option = this.scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
            return option;
        }
        finally {
            this.txnManager.endTransaction((Option<HoodieInstant>)inflightInstant);
        }
    }

    protected Option<String> scheduleTableServiceInternal(String instantTime, Option<Map<String, String>> extraMetadata, TableServiceType tableServiceType) {
        if (!this.tableServicesEnabled(this.config)) {
            return Option.empty();
        }
        Option option = Option.empty();
        HoodieTable<?, ?, ?, ?> table = this.createTable(this.config, this.hadoopConf);
        switch (tableServiceType) {
            case ARCHIVE: {
                LOG.info((Object)"Scheduling archiving is not supported. Skipping.");
                break;
            }
            case CLUSTER: {
                LOG.info((Object)("Scheduling clustering at instant time :" + instantTime));
                Option<HoodieClusteringPlan> clusteringPlan = table.scheduleClustering(this.context, instantTime, extraMetadata);
                option = clusteringPlan.isPresent() ? Option.of((Object)instantTime) : Option.empty();
                break;
            }
            case COMPACT: {
                LOG.info((Object)("Scheduling compaction at instant time :" + instantTime));
                Option<HoodieCompactionPlan> compactionPlan = table.scheduleCompaction(this.context, instantTime, extraMetadata);
                option = compactionPlan.isPresent() ? Option.of((Object)instantTime) : Option.empty();
                break;
            }
            case LOG_COMPACT: {
                LOG.info((Object)("Scheduling log compaction at instant time :" + instantTime));
                Option<HoodieCompactionPlan> logCompactionPlan = table.scheduleLogCompaction(this.context, instantTime, extraMetadata);
                option = logCompactionPlan.isPresent() ? Option.of((Object)instantTime) : Option.empty();
                break;
            }
            case CLEAN: {
                LOG.info((Object)("Scheduling cleaning at instant time :" + instantTime));
                Option<HoodieCleanerPlan> cleanerPlan = table.scheduleCleaning(this.context, instantTime, extraMetadata);
                option = cleanerPlan.isPresent() ? Option.of((Object)instantTime) : Option.empty();
                break;
            }
            default: {
                throw new IllegalArgumentException("Invalid TableService " + tableServiceType);
            }
        }
        Option<String> instantRange = this.delegateToTableServiceManager(tableServiceType, table);
        if (instantRange.isPresent()) {
            LOG.info((Object)("Delegate instant [" + (String)instantRange.get() + "] to table service manager"));
        }
        return option;
    }

    protected abstract HoodieTable<?, ?, ?, ?> createTable(HoodieWriteConfig var1, Configuration var2);

    protected Option<String> inlineClustering(Option<Map<String, String>> extraMetadata) {
        Option<String> clusteringInstantOpt = this.inlineScheduleClustering(extraMetadata);
        clusteringInstantOpt.ifPresent(clusteringInstant -> this.cluster((String)clusteringInstant, true));
        return clusteringInstantOpt;
    }

    private void inlineClustering(HoodieTable table, Option<Map<String, String>> extraMetadata) {
        if (this.shouldDelegateToTableServiceManager(this.config, ActionType.replacecommit)) {
            this.scheduleClustering(extraMetadata);
        } else {
            this.runAnyPendingClustering(table);
            this.inlineClustering(extraMetadata);
        }
    }

    protected Option<String> inlineScheduleClustering(Option<Map<String, String>> extraMetadata) {
        return this.scheduleClustering(extraMetadata);
    }

    protected void runAnyPendingClustering(HoodieTable table) {
        table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> {
            Option instantPlan = ClusteringUtils.getClusteringPlan((HoodieTableMetaClient)table.getMetaClient(), (HoodieInstant)instant);
            if (instantPlan.isPresent()) {
                LOG.info((Object)("Running pending clustering at instant " + ((Pair)instantPlan.get()).getLeft()));
                this.cluster(instant.getTimestamp(), true);
            }
        });
    }

    protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
        ValidationUtils.checkArgument((boolean)table.isTableServiceAction(actionType, instantTime), (String)String.format("Unsupported action: %s.%s is not table service.", actionType, instantTime));
        this.context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + this.config.getTableName());
        table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime, true));
    }

    @Nullable
    @Deprecated
    public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline, boolean skipLocking) throws HoodieIOException {
        return this.clean(cleanInstantTime, scheduleInline);
    }

    @Nullable
    public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline) throws HoodieIOException {
        if (!this.tableServicesEnabled(this.config)) {
            return null;
        }
        Timer.Context timerContext = this.metrics.getCleanCtx();
        CleanerUtils.rollbackFailedWrites((HoodieFailedWritesCleaningPolicy)this.config.getFailedWritesCleanPolicy(), (String)"clean", (Functions.Function0 & Serializable)() -> this.rollbackFailedWrites());
        HoodieTable<?, ?, ?, ?> table = this.createTable(this.config, this.hadoopConf);
        if (this.config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {
            LOG.info((Object)"Cleaner started");
            if (scheduleInline) {
                this.scheduleTableServiceInternal(cleanInstantTime, (Option<Map<String, String>>)Option.empty(), TableServiceType.CLEAN);
                table.getMetaClient().reloadActiveTimeline();
            }
            if (this.shouldDelegateToTableServiceManager(this.config, ActionType.clean)) {
                LOG.warn((Object)"Cleaning is not yet supported with Table Service Manager.");
                return null;
            }
        }
        HoodieCleanMetadata metadata = table.clean(this.context, cleanInstantTime);
        if (timerContext != null && metadata != null) {
            long durationMs = this.metrics.getDurationInMs(timerContext.stop());
            this.metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
            LOG.info((Object)("Cleaned " + metadata.getTotalFilesDeleted() + " files Earliest Retained Instant :" + metadata.getEarliestCommitToRetain() + " cleanerElapsedMs" + durationMs));
        }
        return metadata;
    }

    protected void archive(HoodieTable table) {
        if (!this.tableServicesEnabled(this.config)) {
            return;
        }
        try {
            HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(this.config, table);
            archiver.archiveIfRequired(this.context, true);
        }
        catch (IOException ioe) {
            throw new HoodieIOException("Failed to archive", ioe);
        }
    }

    private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieTableMetaClient metaClient) {
        HoodieTimeline inflightTimelineWithReplaceCommit = metaClient.getCommitsTimeline().filterPendingExcludingCompaction();
        HoodieTimeline inflightTimelineExcludeClusteringCommit = inflightTimelineWithReplaceCommit.filter(instant -> {
            if (instant.getAction().equals("replacecommit")) {
                Option instantPlan = ClusteringUtils.getClusteringPlan((HoodieTableMetaClient)metaClient, (HoodieInstant)instant);
                return !instantPlan.isPresent();
            }
            return true;
        });
        return inflightTimelineExcludeClusteringCommit;
    }

    protected Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback) {
        return this.getPendingRollbackInfo(metaClient, commitToRollback, true);
    }

    public Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) {
        return this.getPendingRollbackInfos(metaClient, ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback, (Option<HoodiePendingRollbackInfo>)Option.empty());
    }

    protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient) {
        return this.getPendingRollbackInfos(metaClient, true);
    }

    protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient, boolean ignoreCompactionAndClusteringInstants) {
        List instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants();
        HashMap<String, Option<HoodiePendingRollbackInfo>> infoMap = new HashMap<String, Option<HoodiePendingRollbackInfo>>();
        for (HoodieInstant rollbackInstant : instants) {
            HoodieRollbackPlan rollbackPlan;
            try {
                rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, rollbackInstant);
            }
            catch (Exception e) {
                if (rollbackInstant.isRequested()) {
                    LOG.warn((Object)("Fetching rollback plan failed for " + rollbackInstant + ", deleting the plan since it's in REQUESTED state"), (Throwable)e);
                    try {
                        metaClient.getActiveTimeline().deletePending(rollbackInstant);
                    }
                    catch (HoodieIOException he) {
                        LOG.warn((Object)("Cannot delete " + rollbackInstant), (Throwable)he);
                    }
                    continue;
                }
                LOG.warn((Object)("Fetching rollback plan failed for " + rollbackInstant + ", skip the plan"), (Throwable)e);
                continue;
            }
            try {
                String action = rollbackPlan.getInstantToRollback().getAction();
                if (ignoreCompactionAndClusteringInstants) {
                    boolean isClustering;
                    if ("compaction".equals(action) || (isClustering = "replacecommit".equals(action) && ClusteringUtils.getClusteringPlan((HoodieTableMetaClient)metaClient, (HoodieInstant)new HoodieInstant(true, rollbackPlan.getInstantToRollback().getAction(), rollbackPlan.getInstantToRollback().getCommitTime())).isPresent())) continue;
                    String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime();
                    infoMap.putIfAbsent(instantToRollback, (Option<HoodiePendingRollbackInfo>)Option.of((Object)new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
                    continue;
                }
                infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), (Option<HoodiePendingRollbackInfo>)Option.of((Object)new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
            }
            catch (Exception e) {
                LOG.warn((Object)("Processing rollback plan failed for " + rollbackInstant + ", skip the plan"), (Throwable)e);
            }
        }
        return infoMap;
    }

    protected boolean rollbackFailedIndexingCommits() {
        HoodieTable<?, ?, ?, ?> table = this.createTable(this.config, this.hadoopConf);
        List<String> instantsToRollback = this.getFailedIndexingCommitsToRollback(table.getMetaClient());
        Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = this.getPendingRollbackInfos(table.getMetaClient());
        instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent((String)entry, (Option<HoodiePendingRollbackInfo>)Option.empty()));
        this.rollbackFailedWrites(pendingRollbacks);
        return !pendingRollbacks.isEmpty();
    }

    protected List<String> getFailedIndexingCommitsToRollback(HoodieTableMetaClient metaClient) {
        Stream inflightInstantsStream = metaClient.getCommitsTimeline().filter(instant -> !instant.isCompleted() && HoodieTableMetadataUtil.isIndexingCommit((String)instant.getTimestamp())).getInstantsAsStream();
        return inflightInstantsStream.filter(instant -> {
            try {
                return this.heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
            }
            catch (IOException io) {
                throw new HoodieException("Failed to check heartbeat for instant " + instant, (Throwable)io);
            }
        }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
    }

    protected Boolean rollbackFailedWrites() {
        HoodieTable<?, ?, ?, ?> table = this.createTable(this.config, this.hadoopConf);
        List<String> instantsToRollback = this.getInstantsToRollback(table.getMetaClient(), this.config.getFailedWritesCleanPolicy(), (Option<String>)Option.empty());
        Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = this.getPendingRollbackInfos(table.getMetaClient());
        instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent((String)entry, (Option<HoodiePendingRollbackInfo>)Option.empty()));
        this.rollbackFailedWrites(pendingRollbacks);
        return !pendingRollbacks.isEmpty();
    }

    protected void rollbackFailedWrites(Map<String, Option<HoodiePendingRollbackInfo>> instantsToRollback) {
        this.rollbackFailedWrites(instantsToRollback, false);
    }

    protected void rollbackFailedWrites(Map<String, Option<HoodiePendingRollbackInfo>> instantsToRollback, boolean skipLocking) {
        LinkedHashMap reverseSortedRollbackInstants = instantsToRollback.entrySet().stream().sorted((i1, i2) -> ((String)i2.getKey()).compareTo((String)i1.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new));
        for (Map.Entry entry : reverseSortedRollbackInstants.entrySet()) {
            if (HoodieTimeline.compareTimestamps((String)((String)entry.getKey()), (BiPredicate)HoodieTimeline.LESSER_THAN_OR_EQUALS, (String)"00000000000002")) {
                this.rollbackFailedBootstrap();
                HeartbeatUtils.deleteHeartbeatFile(this.fs, this.basePath, (String)entry.getKey(), this.config);
                break;
            }
            this.rollback((String)entry.getKey(), (Option<HoodiePendingRollbackInfo>)((Option)entry.getValue()), skipLocking);
            HeartbeatUtils.deleteHeartbeatFile(this.fs, this.basePath, (String)entry.getKey(), this.config);
        }
    }

    protected List<String> getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy, Option<String> curInstantTime) {
        Stream inflightInstantsStream = this.getInflightTimelineExcludeCompactionAndClustering(metaClient).getReverseOrderedInstants();
        if (cleaningPolicy.isEager()) {
            if (HoodieTableMetadata.isMetadataTable((String)metaClient.getBasePathV2().toString())) {
                return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> {
                    if (curInstantTime.isPresent()) {
                        return !entry.equals(curInstantTime.get());
                    }
                    return !HoodieTableMetadataUtil.isIndexingCommit((String)entry);
                }).collect(Collectors.toList());
            }
            return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> {
                if (curInstantTime.isPresent()) {
                    return !entry.equals(curInstantTime.get());
                }
                return true;
            }).collect(Collectors.toList());
        }
        if (cleaningPolicy.isLazy()) {
            return inflightInstantsStream.filter(instant -> {
                try {
                    return this.heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
                }
                catch (IOException io) {
                    throw new HoodieException("Failed to check heartbeat for instant " + instant, (Throwable)io);
                }
            }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
        }
        if (cleaningPolicy.isNever()) {
            return Collections.emptyList();
        }
        throw new IllegalArgumentException("Invalid Failed Writes Cleaning Policy " + this.config.getFailedWritesCleanPolicy());
    }

    @Deprecated
    public boolean rollback(String commitInstantTime, Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException {
        LOG.info((Object)("Begin rollback of instant " + commitInstantTime));
        String rollbackInstantTime = (String)pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse((Object)HoodieActiveTimeline.createNewInstantTime());
        Timer.Context timerContext = this.metrics.getRollbackCtx();
        try {
            HoodieTable<?, ?, ?, ?> table = this.createTable(this.config, this.hadoopConf);
            Option commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream().filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime)).findFirst());
            if (commitInstantOpt.isPresent() || pendingRollbackInfo.isPresent()) {
                LOG.info((Object)String.format("Scheduling Rollback at instant time : %s (exists in active timeline: %s), with rollback plan: %s", rollbackInstantTime, commitInstantOpt.isPresent(), pendingRollbackInfo.isPresent()));
                Option rollbackPlanOption = (Option)pendingRollbackInfo.map(entry -> Option.of((Object)entry.getRollbackPlan())).orElseGet(() -> table.scheduleRollback(this.context, rollbackInstantTime, (HoodieInstant)commitInstantOpt.get(), false, this.config.shouldRollbackUsingMarkers()));
                if (rollbackPlanOption.isPresent()) {
                    HoodieRollbackMetadata rollbackMetadata;
                    HoodieRollbackMetadata hoodieRollbackMetadata = rollbackMetadata = commitInstantOpt.isPresent() ? table.rollback(this.context, rollbackInstantTime, (HoodieInstant)commitInstantOpt.get(), true, skipLocking) : table.rollback(this.context, rollbackInstantTime, new HoodieInstant(true, ((HoodieRollbackPlan)rollbackPlanOption.get()).getInstantToRollback().getAction(), commitInstantTime), false, skipLocking);
                    if (timerContext != null) {
                        long durationInMs = this.metrics.getDurationInMs(timerContext.stop());
                        this.metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted().intValue());
                    }
                    return true;
                }
                throw new HoodieRollbackException("Failed to rollback " + this.config.getBasePath() + " commits " + commitInstantTime);
            }
            LOG.warn((Object)("Cannot find instant " + commitInstantTime + " in the timeline, for rollback"));
            return false;
        }
        catch (Exception e) {
            throw new HoodieRollbackException("Failed to rollback " + this.config.getBasePath() + " commits " + commitInstantTime, e);
        }
    }

    public void rollbackFailedBootstrap() {
        LOG.info((Object)"Rolling back pending bootstrap if present");
        HoodieTable<?, ?, ?, ?> table = this.createTable(this.config, this.hadoopConf);
        HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction();
        Option instant = Option.fromJavaOptional(inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
        if (instant.isPresent() && HoodieTimeline.compareTimestamps((String)((String)instant.get()), (BiPredicate)HoodieTimeline.LESSER_THAN_OR_EQUALS, (String)"00000000000002")) {
            LOG.info((Object)"Found pending bootstrap instants. Rolling them back");
            table.rollbackBootstrap(this.context, HoodieActiveTimeline.createNewInstantTime());
            LOG.info((Object)"Finished rolling back pending bootstrap");
        }
    }

    private Option<String> delegateToTableServiceManager(TableServiceType tableServiceType, HoodieTable table) {
        if (!this.config.getTableServiceManagerConfig().isEnabledAndActionSupported(ActionType.compaction)) {
            return Option.empty();
        }
        HoodieTableServiceManagerClient tableServiceManagerClient = new HoodieTableServiceManagerClient(table.getMetaClient(), this.config.getTableServiceManagerConfig());
        switch (tableServiceType) {
            case COMPACT: {
                return tableServiceManagerClient.executeCompaction();
            }
            case CLUSTER: {
                return tableServiceManagerClient.executeClustering();
            }
            case CLEAN: {
                return tableServiceManagerClient.executeClean();
            }
        }
        LOG.info((Object)("Not supported delegate to table service manager, tableServiceType : " + tableServiceType.getAction()));
        return Option.empty();
    }

    @Override
    public void close() {
        AsyncArchiveService.forceShutdown(this.asyncArchiveService);
        this.asyncArchiveService = null;
        AsyncCleanerService.forceShutdown(this.asyncCleanerService);
        this.asyncCleanerService = null;
        super.close();
    }
}

