/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public abstract class BaseCommitActionExecutor<T, I, K, O, R>
extends BaseActionExecutor<T, I, K, O, R> {
    private static final Logger LOG = LogManager.getLogger(BaseCommitActionExecutor.class);
    protected final Option<Map<String, String>> extraMetadata;
    protected final WriteOperationType operationType;
    protected final TaskContextSupplier taskContextSupplier;
    protected final TransactionManager txnManager;
    protected Option<Pair<HoodieInstant, Map<String, String>>> lastCompletedTxn;
    protected Set<String> pendingInflightAndRequestedInstants;

    public BaseCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime, WriteOperationType operationType, Option<Map<String, String>> extraMetadata) {
        super(context, config, table, instantTime);
        this.operationType = operationType;
        this.extraMetadata = extraMetadata;
        this.taskContextSupplier = context.getTaskContextSupplier();
        this.txnManager = new TransactionManager(config, (FileSystem)table.getMetaClient().getFs());
        this.lastCompletedTxn = this.txnManager.isOptimisticConcurrencyControlEnabled() ? TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient()) : Option.empty();
        this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(table.getMetaClient());
        this.pendingInflightAndRequestedInstants.remove(instantTime);
        if (!table.getStorageLayout().writeOperationSupported(operationType)) {
            throw new UnsupportedOperationException("Executor " + this.getClass().getSimpleName() + " is not compatible with table layout " + table.getStorageLayout().getClass().getSimpleName());
        }
    }

    public abstract HoodieWriteMetadata<O> execute(I var1);

    void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String instantTime) throws HoodieCommitException {
        try {
            HoodieCommitMetadata metadata = new HoodieCommitMetadata();
            profile.getOutputPartitionPaths().forEach(path -> {
                WorkloadStat partitionStat = profile.getOutputWorkloadStat((String)path);
                HoodieWriteStat insertStat = new HoodieWriteStat();
                insertStat.setNumInserts(partitionStat.getNumInserts());
                insertStat.setFileId("");
                insertStat.setPrevCommit("null");
                metadata.addWriteStat(path, insertStat);
                HashMap<String, Pair<String, Long>> updateLocationMap = partitionStat.getUpdateLocationToCount();
                HashMap<String, Pair<String, Long>> insertLocationMap = partitionStat.getInsertLocationToCount();
                Stream.concat(updateLocationMap.keySet().stream(), insertLocationMap.keySet().stream()).distinct().forEach(fileId -> {
                    HoodieWriteStat writeStat = new HoodieWriteStat();
                    writeStat.setFileId(fileId);
                    Pair updateLocation = (Pair)updateLocationMap.get(fileId);
                    Pair insertLocation = (Pair)insertLocationMap.get(fileId);
                    writeStat.setPrevCommit(updateLocation != null ? (String)updateLocation.getKey() : (String)insertLocation.getKey());
                    if (updateLocation != null) {
                        writeStat.setNumUpdateWrites(((Long)updateLocation.getValue()).longValue());
                    }
                    if (insertLocation != null) {
                        writeStat.setNumInserts(((Long)insertLocation.getValue()).longValue());
                    }
                    metadata.addWriteStat(path, writeStat);
                });
            });
            metadata.setOperationType(this.operationType);
            HoodieActiveTimeline activeTimeline = this.table.getActiveTimeline();
            String commitActionType = this.getCommitActionType();
            HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitActionType, instantTime);
            activeTimeline.transitionRequestedToInflight(requested, Option.of((Object)metadata.toJsonString().getBytes(StandardCharsets.UTF_8)), this.config.shouldAllowMultiWriteOnSameInstant());
        }
        catch (IOException io) {
            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
        }
    }

    protected String getCommitActionType() {
        return this.table.getMetaClient().getCommitActionType();
    }

    protected void runPrecommitValidators(HoodieWriteMetadata<O> writeMetadata) {
        if (StringUtils.isNullOrEmpty((String)this.config.getPreCommitValidators())) {
            return;
        }
        throw new HoodieIOException("Precommit validation not implemented for all engines yet");
    }

    protected void commitOnAutoCommit(HoodieWriteMetadata result) {
        this.runPrecommitValidators(result);
        if (this.config.shouldAutoCommit().booleanValue()) {
            LOG.info((Object)("Auto commit enabled: Committing " + this.instantTime));
            this.autoCommit(this.extraMetadata, result);
        } else {
            LOG.info((Object)("Auto commit disabled for " + this.instantTime));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void autoCommit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<O> result) {
        Option inflightInstant = Option.of((Object)new HoodieInstant(HoodieInstant.State.INFLIGHT, this.getCommitActionType(), this.instantTime));
        this.txnManager.beginTransaction((Option<HoodieInstant>)inflightInstant, (Option<HoodieInstant>)(this.lastCompletedTxn.isPresent() ? Option.of((Object)((Pair)this.lastCompletedTxn.get()).getLeft()) : Option.empty()));
        try {
            this.setCommitMetadata(result);
            TransactionUtils.resolveWriteConflictIfAny(this.table, this.txnManager.getCurrentTransactionOwner(), result.getCommitMetadata(), this.config, this.txnManager.getLastCompletedTransactionOwner(), true, this.pendingInflightAndRequestedInstants);
            this.commit(extraMetadata, result);
        }
        finally {
            this.txnManager.endTransaction((Option<HoodieInstant>)inflightInstant);
        }
    }

    protected abstract void setCommitMetadata(HoodieWriteMetadata<O> var1);

    protected abstract void commit(Option<Map<String, String>> var1, HoodieWriteMetadata<O> var2);

    protected void finalizeWrite(String instantTime, List<HoodieWriteStat> stats, HoodieWriteMetadata result) {
        try {
            Instant start = Instant.now();
            this.table.finalizeWrite(this.context, instantTime, stats);
            result.setFinalizeDuration(Duration.between(start, Instant.now()));
        }
        catch (HoodieIOException ioe) {
            throw new HoodieCommitException("Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
        }
    }

    protected String getSchemaToStoreInCommit() {
        return this.config.getSchema();
    }

    protected boolean isWorkloadProfileNeeded() {
        return true;
    }

    protected abstract Iterator<List<WriteStatus>> handleInsert(String var1, Iterator<HoodieRecord<T>> var2) throws Exception;

    protected abstract Iterator<List<WriteStatus>> handleUpdate(String var1, String var2, Iterator<HoodieRecord<T>> var3) throws IOException;

    protected HoodieWriteMetadata<HoodieData<WriteStatus>> executeClustering(HoodieClusteringPlan clusteringPlan) {
        HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant((String)this.instantTime);
        this.table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
        this.table.getMetaClient().reloadActiveTimeline();
        this.config.setValue(HoodieWriteConfig.AUTO_COMMIT_ENABLE, Boolean.FALSE.toString());
        Schema schema = HoodieAvroUtils.addMetadataFields((Schema)new Schema.Parser().parse(this.config.getSchema()));
        HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = ((ClusteringExecutionStrategy)ReflectionUtils.loadClass((String)this.config.getClusteringExecutionStrategyClass(), (Class[])new Class[]{HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, (Object[])new Object[]{this.table, this.context, this.config})).performClustering(clusteringPlan, schema, this.instantTime);
        HoodieData writeStatusList = (HoodieData)writeMetadata.getWriteStatuses();
        HoodieData<WriteStatus> statuses = this.updateIndex((HoodieData<WriteStatus>)writeStatusList, writeMetadata);
        writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collectAsList());
        writeMetadata.setPartitionToReplaceFileIds(this.getPartitionToReplacedFileIds(clusteringPlan, writeMetadata));
        this.commitOnAutoCommit(writeMetadata);
        if (!writeMetadata.getCommitMetadata().isPresent()) {
            HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata((List)((List)writeMetadata.getWriteStats().get()), writeMetadata.getPartitionToReplaceFileIds(), this.extraMetadata, (WriteOperationType)this.operationType, (String)this.getSchemaToStoreInCommit(), (String)this.getCommitActionType());
            writeMetadata.setCommitMetadata((Option<HoodieCommitMetadata>)Option.of((Object)commitMetadata));
        }
        return writeMetadata;
    }

    private HoodieData<WriteStatus> updateIndex(HoodieData<WriteStatus> writeStatuses, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
        Instant indexStartTime = Instant.now();
        HoodieData<WriteStatus> statuses = this.table.getIndex().updateLocation(writeStatuses, this.context, this.table, this.instantTime);
        result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
        result.setWriteStatuses(statuses);
        return statuses;
    }

    private Map<String, List<String>> getPartitionToReplacedFileIds(HoodieClusteringPlan clusteringPlan, HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
        Set newFilesWritten = ((List)writeMetadata.getWriteStats().get()).stream().map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet());
        return ClusteringUtils.getFileGroupsFromClusteringPlan((HoodieClusteringPlan)clusteringPlan).filter(fg -> "org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy".equals(this.config.getClusteringExecutionStrategyClass()) || !newFilesWritten.contains(fg)).collect(Collectors.groupingBy(HoodieFileGroupId::getPartitionPath, Collectors.mapping(HoodieFileGroupId::getFileId, Collectors.toList())));
    }

    private void validateWriteResult(HoodieClusteringPlan clusteringPlan, HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
        if (writeMetadata.getWriteStatuses().isEmpty()) {
            throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + this.instantTime + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least " + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum() + " write statuses");
        }
    }
}

