/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.metadata;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordDelegate;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.hadoop.SerializablePath;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.MetadataRecordsGenerationParams;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableMetadataWriter {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieBackedTableMetadataWriter.class);
    public static final String METADATA_COMPACTION_TIME_SUFFIX = "001";
    private static final String RECORD_KEY_FIELD_NAME = "key";
    private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48;
    private transient BaseHoodieWriteClient<?, I, ?, ?> writeClient;
    protected HoodieWriteConfig metadataWriteConfig;
    protected HoodieWriteConfig dataWriteConfig;
    protected HoodieBackedTableMetadata metadata;
    protected HoodieTableMetaClient metadataMetaClient;
    protected HoodieTableMetaClient dataMetaClient;
    protected Option<HoodieMetadataMetrics> metrics;
    protected SerializableConfiguration hadoopConf;
    protected final transient HoodieEngineContext engineContext;
    protected final List<MetadataPartitionType> enabledPartitionTypes;
    private boolean initialized = false;

    protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, HoodieEngineContext engineContext, Option<String> inflightInstantTimestamp) {
        this.dataWriteConfig = writeConfig;
        this.engineContext = engineContext;
        this.hadoopConf = new SerializableConfiguration(hadoopConf);
        this.metrics = Option.empty();
        this.enabledPartitionTypes = new ArrayList<MetadataPartitionType>(4);
        this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(this.dataWriteConfig.getBasePath()).build();
        if (writeConfig.isMetadataTableEnabled()) {
            this.metadataWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig, failedWritesCleaningPolicy);
            try {
                this.enablePartitions();
                this.initRegistry();
                this.initialized = this.initializeIfNeeded(this.dataMetaClient, inflightInstantTimestamp);
            }
            catch (IOException e) {
                LOG.error("Failed to initialize metadata table", (Throwable)e);
            }
        }
        ValidationUtils.checkArgument((!this.initialized || this.metadata != null ? 1 : 0) != 0, (String)"MDT Reader should have been opened post initialization");
    }

    private void initMetadataReader() {
        if (this.metadata != null) {
            this.metadata.close();
        }
        try {
            this.metadata = new HoodieBackedTableMetadata(this.engineContext, this.dataWriteConfig.getMetadataConfig(), this.dataWriteConfig.getBasePath(), true);
            this.metadataMetaClient = this.metadata.getMetadataMetaClient();
        }
        catch (Exception e) {
            throw new HoodieException("Could not open MDT for reads", (Throwable)e);
        }
    }

    private void enablePartitions() {
        HoodieMetadataConfig metadataConfig = this.dataWriteConfig.getMetadataConfig();
        if (this.dataWriteConfig.isMetadataTableEnabled() || this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES)) {
            this.enabledPartitionTypes.add(MetadataPartitionType.FILES);
        }
        if (metadataConfig.isBloomFilterIndexEnabled() || this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.BLOOM_FILTERS)) {
            this.enabledPartitionTypes.add(MetadataPartitionType.BLOOM_FILTERS);
        }
        if (metadataConfig.isColumnStatsIndexEnabled() || this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.COLUMN_STATS)) {
            this.enabledPartitionTypes.add(MetadataPartitionType.COLUMN_STATS);
        }
        if (this.dataWriteConfig.isRecordIndexEnabled() || this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX)) {
            this.enabledPartitionTypes.add(MetadataPartitionType.RECORD_INDEX);
        }
    }

    protected abstract void initRegistry();

    public HoodieWriteConfig getWriteConfig() {
        return this.metadataWriteConfig;
    }

    public HoodieBackedTableMetadata getTableMetadata() {
        return this.metadata;
    }

    public List<MetadataPartitionType> getEnabledPartitionTypes() {
        return this.enabledPartitionTypes;
    }

    protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient, Option<String> inflightInstantTimestamp) throws IOException {
        HoodieTimer timer = HoodieTimer.start();
        ArrayList<MetadataPartitionType> partitionsToInit = new ArrayList<MetadataPartitionType>(MetadataPartitionType.values().length);
        try {
            boolean exists = this.metadataTableExists(dataMetaClient);
            if (!exists) {
                partitionsToInit.add(MetadataPartitionType.FILES);
            }
            if (!this.dataWriteConfig.isMetadataAsyncIndex()) {
                Set completedPartitions = dataMetaClient.getTableConfig().getMetadataPartitions();
                LOG.info("Async metadata indexing disabled and following partitions already initialized: " + completedPartitions);
                this.enabledPartitionTypes.stream().filter(p -> !completedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p)).forEach(partitionsToInit::add);
            }
            if (partitionsToInit.isEmpty()) {
                this.initMetadataReader();
                return true;
            }
            String initializationTime = (String)dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse((Object)"00000000000000");
            if (!this.initializeFromFilesystem(initializationTime, partitionsToInit, inflightInstantTimestamp)) {
                LOG.error("Failed to initialize MDT from filesystem");
                return false;
            }
            this.metrics.ifPresent(m -> m.updateMetrics("initialize", timer.endTimer()));
            return true;
        }
        catch (IOException e) {
            LOG.error("Failed to initialize metadata table. Disabling the writer.", (Throwable)e);
            return false;
        }
    }

    private boolean metadataTableExists(HoodieTableMetaClient dataMetaClient) throws IOException {
        boolean exists = dataMetaClient.getTableConfig().isMetadataTableAvailable();
        boolean reInitialize = false;
        if (exists) {
            try {
                this.metadataMetaClient = HoodieTableMetaClient.builder().setConf(this.hadoopConf.get()).setBasePath(this.metadataWriteConfig.getBasePath()).build();
                if (this.metadataMetaClient.getTableConfig().populateMetaFields()) {
                    LOG.info("Re-initiating metadata table properties since populate meta fields have changed");
                    this.metadataMetaClient = this.initializeMetaClient();
                }
            }
            catch (TableNotFoundException e) {
                return false;
            }
            Option latestMetadataInstant = this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
            reInitialize = this.isBootstrapNeeded((Option<HoodieInstant>)latestMetadataInstant);
        }
        if (reInitialize) {
            this.metrics.ifPresent(m -> m.updateMetrics("rebootstrap", 1L));
            LOG.info("Deleting Metadata Table directory so that it can be re-initialized");
            HoodieTableMetadataUtil.deleteMetadataTable((HoodieTableMetaClient)dataMetaClient, (HoodieEngineContext)this.engineContext, (boolean)false);
            exists = false;
        }
        return exists;
    }

    private boolean isBootstrapNeeded(Option<HoodieInstant> latestMetadataInstant) {
        if (!latestMetadataInstant.isPresent()) {
            LOG.warn("Metadata Table will need to be re-initialized as no instants were found");
            return true;
        }
        String latestMetadataInstantTimestamp = ((HoodieInstant)latestMetadataInstant.get()).getTimestamp();
        if (latestMetadataInstantTimestamp.startsWith("00000000000000")) {
            return false;
        }
        return false;
    }

    private boolean initializeFromFilesystem(String initializationTime, List<MetadataPartitionType> partitionsToInit, Option<String> inflightInstantTimestamp) throws IOException {
        if (this.anyPendingDataInstant(this.dataMetaClient, inflightInstantTimestamp)) {
            return false;
        }
        boolean filesPartitionAvailable = this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES);
        if (!filesPartitionAvailable) {
            partitionsToInit.remove(MetadataPartitionType.FILES);
            partitionsToInit.add(0, MetadataPartitionType.FILES);
            this.metadataMetaClient = this.initializeMetaClient();
        } else {
            this.initMetadataReader();
            if (this.metadataMetaClient == null) {
                this.metadataMetaClient = HoodieTableMetaClient.builder().setConf(this.hadoopConf.get()).setBasePath(this.metadataWriteConfig.getBasePath()).build();
            }
        }
        partitionsToInit.removeIf(metadataPartition -> this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(metadataPartition));
        List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ? this.listAllPartitionsFromMDT(initializationTime) : (this.dataWriteConfig.getMetadataConfig().shouldAutoInitialize() ? this.listAllPartitionsFromFilesystem(initializationTime) : Collections.emptyList());
        Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream().map(p -> {
            String partitionName = HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition((String)p.getRelativePath());
            return Pair.of((Object)partitionName, p.getFileNameToSizeMap());
        }).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        for (MetadataPartitionType partitionType : partitionsToInit) {
            Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
            String commitTimeForPartition = this.generateUniqueCommitInstantTime(initializationTime);
            LOG.info("Initializing MDT partition " + partitionType.name() + " at instant " + commitTimeForPartition);
            try {
                switch (partitionType) {
                    case FILES: {
                        fileGroupCountAndRecordsPair = this.initializeFilesPartition(partitionInfoList);
                        break;
                    }
                    case BLOOM_FILTERS: {
                        fileGroupCountAndRecordsPair = this.initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
                        break;
                    }
                    case COLUMN_STATS: {
                        fileGroupCountAndRecordsPair = this.initializeColumnStatsPartition(partitionToFilesMap);
                        break;
                    }
                    case RECORD_INDEX: {
                        fileGroupCountAndRecordsPair = this.initializeRecordIndexPartition();
                        break;
                    }
                    default: {
                        throw new HoodieMetadataException("Unsupported MDT partition type: " + partitionType);
                    }
                }
            }
            catch (Exception e) {
                String metricKey = partitionType.getPartitionPath() + "_" + "bootstrap_error";
                this.metrics.ifPresent(m -> m.setMetric(metricKey, 1L));
                LOG.error("Bootstrap on " + partitionType.getPartitionPath() + " partition failed for " + this.metadataMetaClient.getBasePath(), (Throwable)e);
                throw new HoodieMetadataException(partitionType.getPartitionPath() + " bootstrap failed for " + this.metadataMetaClient.getBasePath(), e);
            }
            LOG.info(String.format("Initializing %s index with %d mappings and %d file groups.", partitionType.name(), fileGroupCountAndRecordsPair.getKey(), ((HoodieData)fileGroupCountAndRecordsPair.getValue()).count()));
            HoodieTimer partitionInitTimer = HoodieTimer.start();
            int fileGroupCount = (Integer)fileGroupCountAndRecordsPair.getKey();
            ValidationUtils.checkArgument((fileGroupCount > 0 ? 1 : 0) != 0, (String)("FileGroup count for MDT partition " + partitionType.name() + " should be > 0"));
            this.initializeFileGroups(this.dataMetaClient, partitionType, commitTimeForPartition, fileGroupCount);
            HoodieData records = (HoodieData)fileGroupCountAndRecordsPair.getValue();
            this.bulkCommit(commitTimeForPartition, partitionType, (HoodieData<HoodieRecord>)records, fileGroupCount);
            this.metadataMetaClient.reloadActiveTimeline();
            this.dataMetaClient.getTableConfig().setMetadataPartitionState(this.dataMetaClient, partitionType, true);
            this.initMetadataReader();
            long totalInitTime = partitionInitTimer.endTimer();
            LOG.info(String.format("Initializing %s index in metadata table took " + totalInitTime + " in ms", partitionType.name()));
        }
        return true;
    }

    private String generateUniqueCommitInstantTime(String initializationTime) {
        if (HoodieTableMetadataUtil.isIndexingCommit((String)initializationTime)) {
            return initializationTime;
        }
        int offset = 0;
        while (true) {
            String commitInstantTime = HoodieTableMetadataUtil.createIndexInitTimestamp((String)initializationTime, (int)offset);
            if (!this.metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
                return commitInstantTime;
            }
            ++offset;
        }
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializeColumnStatsPartition(Map<String, Map<String, Long>> partitionToFilesMap) {
        HoodieData records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords((HoodieEngineContext)this.engineContext, Collections.emptyMap(), partitionToFilesMap, (MetadataRecordsGenerationParams)this.getRecordsGenerationParams());
        int fileGroupCount = this.dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
        return Pair.of((Object)fileGroupCount, (Object)records);
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializeBloomFiltersPartition(String createInstantTime, Map<String, Map<String, Long>> partitionToFilesMap) {
        HoodieData records = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords((HoodieEngineContext)this.engineContext, Collections.emptyMap(), partitionToFilesMap, (MetadataRecordsGenerationParams)this.getRecordsGenerationParams(), (String)createInstantTime);
        int fileGroupCount = this.dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount();
        return Pair.of((Object)fileGroupCount, (Object)records);
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition() throws IOException {
        HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(this.dataMetaClient, (HoodieTimeline)this.dataMetaClient.getActiveTimeline(), (HoodieTableMetadata)this.metadata);
        List partitions = this.metadata.getAllPartitionPaths();
        fsView.loadAllPartitions();
        ArrayList partitionBaseFilePairs = new ArrayList();
        for (String partition : partitions) {
            partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition).map(basefile -> Pair.of((Object)partition, (Object)basefile)).collect(Collectors.toList()));
        }
        LOG.info("Initializing record index from " + partitionBaseFilePairs.size() + " base files in " + partitions.size() + " partitions");
        HoodieData records = HoodieTableMetadataUtil.readRecordKeysFromBaseFiles((HoodieEngineContext)this.engineContext, partitionBaseFilePairs, (boolean)false, (int)this.dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(), (String)this.dataWriteConfig.getBasePath(), (SerializableConfiguration)this.hadoopConf, (String)this.getClass().getSimpleName());
        records.persist("MEMORY_AND_DISK_SER");
        long recordCount = records.count();
        int fileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount((MetadataPartitionType)MetadataPartitionType.RECORD_INDEX, (long)recordCount, (int)48, (int)this.dataWriteConfig.getRecordIndexMinFileGroupCount(), (int)this.dataWriteConfig.getRecordIndexMaxFileGroupCount(), (float)this.dataWriteConfig.getRecordIndexGrowthFactor(), (int)this.dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes());
        LOG.info(String.format("Initializing record index with %d mappings and %d file groups.", recordCount, fileGroupCount));
        return Pair.of((Object)fileGroupCount, (Object)records);
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializeFilesPartition(List<DirectoryInfo> partitionInfoList) {
        boolean fileGroupCount = true;
        List partitions = partitionInfoList.stream().map(p -> HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition((String)p.getRelativePath())).collect(Collectors.toList());
        int totalDataFilesCount = partitionInfoList.stream().mapToInt(DirectoryInfo::getTotalFiles).sum();
        LOG.info("Committing total {} partitions and {} files to metadata", (Object)partitions.size(), (Object)totalDataFilesCount);
        HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(partitions);
        HoodieData allPartitionsRecord = this.engineContext.parallelize(Collections.singletonList(record), 1);
        if (partitionInfoList.isEmpty()) {
            return Pair.of((Object)1, (Object)allPartitionsRecord);
        }
        this.engineContext.setJobStatus(this.getClass().getSimpleName(), "Creating records for metadata FILES partition");
        HoodieData fileListRecords = this.engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map((SerializableFunction & Serializable)partitionInfo -> {
            Map<String, Long> fileNameToSizeMap = partitionInfo.getFileNameToSizeMap();
            return HoodieMetadataPayload.createPartitionFilesRecord((String)partitionInfo.getRelativePath(), fileNameToSizeMap, Collections.emptyList());
        });
        ValidationUtils.checkState((fileListRecords.count() == (long)partitions.size() ? 1 : 0) != 0);
        return Pair.of((Object)1, (Object)allPartitionsRecord.union(fileListRecords));
    }

    private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Option<String> inflightInstantTimestamp) {
        List pendingDataInstant = dataMetaClient.getActiveTimeline().getInstantsAsStream().filter(i -> !i.isCompleted()).filter(i -> !inflightInstantTimestamp.isPresent() || !i.getTimestamp().equals(inflightInstantTimestamp.get())).filter(i -> !"indexing".equals(i.getAction())).collect(Collectors.toList());
        if (!pendingDataInstant.isEmpty()) {
            this.metrics.ifPresent(m -> m.updateMetrics("bootstrap_error", 1L));
            LOG.warn("Cannot initialize metadata table as operation(s) are in progress on the dataset: " + Arrays.toString(pendingDataInstant.toArray()));
            return true;
        }
        return false;
    }

    private HoodieTableMetaClient initializeMetaClient() throws IOException {
        return HoodieTableMetaClient.withPropertyBuilder().setTableType(HoodieTableType.MERGE_ON_READ).setTableName(this.dataWriteConfig.getTableName() + "_metadata").setArchiveLogFolder((String)HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue()).setPayloadClassName(HoodieMetadataPayload.class.getName()).setBaseFileFormat(HoodieFileFormat.HFILE.toString()).setRecordKeyFields(RECORD_KEY_FIELD_NAME).setPopulateMetaFields(false).setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName()).initTable(this.hadoopConf.get(), this.metadataWriteConfig.getBasePath());
    }

    private List<DirectoryInfo> listAllPartitionsFromFilesystem(String initializationTime) {
        LinkedList<SerializablePath> pathsToList = new LinkedList<SerializablePath>();
        pathsToList.add(new SerializablePath((Path)new CachingPath(this.dataWriteConfig.getBasePath())));
        LinkedList<DirectoryInfo> partitionsToBootstrap = new LinkedList<DirectoryInfo>();
        int fileListingParallelism = this.metadataWriteConfig.getFileListingParallelism();
        SerializableConfiguration conf = new SerializableConfiguration(this.dataMetaClient.getHadoopConf());
        String dirFilterRegex = this.dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
        String datasetBasePath = this.dataMetaClient.getBasePath();
        SerializablePath serializableBasePath = new SerializablePath((Path)new CachingPath(datasetBasePath));
        while (!pathsToList.isEmpty()) {
            int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
            this.engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing " + numDirsToList + " partitions from filesystem");
            List processedDirectories = this.engineContext.map(pathsToList.subList(0, numDirsToList), (SerializableFunction & Serializable)path -> {
                FileSystem fs = path.get().getFileSystem(conf.get());
                String relativeDirPath = FSUtils.getRelativePartitionPath((Path)serializableBasePath.get(), (Path)path.get());
                return new DirectoryInfo(relativeDirPath, fs.listStatus(path.get()), initializationTime);
            }, numDirsToList);
            pathsToList = new LinkedList(pathsToList.subList(numDirsToList, pathsToList.size()));
            for (DirectoryInfo dirInfo : processedDirectories) {
                String relativePath;
                if (!dirFilterRegex.isEmpty() && !(relativePath = dirInfo.getRelativePath()).isEmpty() && relativePath.matches(dirFilterRegex)) {
                    LOG.info("Ignoring directory " + relativePath + " which matches the filter regex " + dirFilterRegex);
                    continue;
                }
                if (dirInfo.isHoodiePartition()) {
                    partitionsToBootstrap.add(dirInfo);
                    continue;
                }
                pathsToList.addAll(dirInfo.getSubDirectories().stream().map(path -> new SerializablePath((Path)new CachingPath(path.toUri()))).collect(Collectors.toList()));
            }
        }
        return partitionsToBootstrap;
    }

    private List<DirectoryInfo> listAllPartitionsFromMDT(String initializationTime) throws IOException {
        LinkedList<DirectoryInfo> dirinfoList = new LinkedList<DirectoryInfo>();
        List allPartitionPaths = this.metadata.getAllPartitionPaths().stream().map(partitionPath -> this.dataWriteConfig.getBasePath() + "/" + partitionPath).collect(Collectors.toList());
        Map partitionFileMap = this.metadata.getAllFilesInPartitions(allPartitionPaths);
        for (Map.Entry entry : partitionFileMap.entrySet()) {
            dirinfoList.add(new DirectoryInfo((String)entry.getKey(), (FileStatus[])entry.getValue(), initializationTime));
        }
        return dirinfoList;
    }

    private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, MetadataPartitionType metadataPartition, String instantTime, int fileGroupCount) throws IOException {
        Path partitionPath = new Path(this.metadataWriteConfig.getBasePath(), metadataPartition.getPartitionPath());
        HoodieWrapperFileSystem fs = this.metadataMetaClient.getFs();
        try {
            FileStatus[] existingFiles = fs.listStatus(partitionPath);
            if (existingFiles.length > 0) {
                LOG.warn("Deleting all existing files found in MDT partition " + metadataPartition.getPartitionPath());
                fs.delete(partitionPath, true);
                ValidationUtils.checkState((!fs.exists(partitionPath) ? 1 : 0) != 0, (String)("Failed to delete MDT partition " + metadataPartition));
            }
        }
        catch (FileNotFoundException existingFiles) {
            // empty catch block
        }
        String msg = String.format("Creating %d file groups for partition %s with base fileId %s at instant time %s", fileGroupCount, metadataPartition.getPartitionPath(), metadataPartition.getFileIdPrefix(), instantTime);
        LOG.info(msg);
        List fileGroupFileIds = IntStream.range(0, fileGroupCount).mapToObj(i -> HoodieTableMetadataUtil.getFileIDForFileGroup((MetadataPartitionType)metadataPartition, (int)i)).collect(Collectors.toList());
        ValidationUtils.checkArgument((fileGroupFileIds.size() == fileGroupCount ? 1 : 0) != 0);
        this.engineContext.setJobStatus(this.getClass().getSimpleName(), msg);
        this.engineContext.foreach(fileGroupFileIds, (SerializableConsumer & Serializable)fileGroupFileId -> {
            try {
                Map<HoodieLogBlock.HeaderMetadataType, String> blockHeader = Collections.singletonMap(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
                HoodieDeleteBlock block = new HoodieDeleteBlock(new DeleteRecord[0], blockHeader);
                HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(FSUtils.getPartitionPath((String)this.metadataWriteConfig.getBasePath(), (String)metadataPartition.getPartitionPath())).withFileId(fileGroupFileId).overBaseCommit(instantTime).withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION.intValue()).withFileSize(0L).withSizeThreshold(this.metadataWriteConfig.getLogFileMaxSize()).withFs((FileSystem)dataMetaClient.getFs()).withRolloverLogWriteToken("0-0-0").withLogWriteToken("0-0-0").withFileExtension(".log").build();
                writer.appendBlock((HoodieLogBlock)block);
                writer.close();
            }
            catch (InterruptedException e) {
                throw new HoodieException("Failed to created fileGroup " + fileGroupFileId + " for partition " + metadataPartition.getPartitionPath(), (Throwable)e);
            }
        }, fileGroupFileIds.size());
    }

    @Override
    public void dropMetadataPartitions(List<MetadataPartitionType> metadataPartitions) throws IOException {
        for (MetadataPartitionType partitionType : metadataPartitions) {
            String partitionPath = partitionType.getPartitionPath();
            this.dataMetaClient.getTableConfig().setMetadataPartitionState(this.dataMetaClient, partitionType, false);
            LOG.warn("Deleting Metadata Table partition: " + partitionPath);
            this.dataMetaClient.getFs().delete(new Path(this.metadataWriteConfig.getBasePath(), partitionPath), true);
            LOG.warn("Deleting pending indexing instant from the timeline for partition: " + partitionPath);
            HoodieBackedTableMetadataWriter.deletePendingIndexingInstant(this.dataMetaClient, partitionPath);
        }
        this.closeInternal();
    }

    private static void deletePendingIndexingInstant(HoodieTableMetaClient metaClient, String partitionPath) {
        metaClient.reloadActiveTimeline().filterPendingIndexTimeline().getInstantsAsStream().filter(instant -> HoodieInstant.State.REQUESTED.equals((Object)instant.getState())).forEach(instant -> {
            try {
                HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan((byte[])((byte[])metaClient.getActiveTimeline().readIndexPlanAsBytes(instant).get()));
                if (indexPlan.getIndexPartitionInfos().stream().anyMatch(indexPartitionInfo -> indexPartitionInfo.getMetadataPartitionPath().equals(partitionPath))) {
                    metaClient.getActiveTimeline().deleteInstantFileIfExists(instant);
                    metaClient.getActiveTimeline().deleteInstantFileIfExists(HoodieTimeline.getIndexInflightInstant((String)instant.getTimestamp()));
                }
            }
            catch (IOException e) {
                LOG.error("Failed to delete the instant file corresponding to " + instant);
            }
        });
    }

    protected static void checkNumDeltaCommits(HoodieTableMetaClient metaClient, int maxNumDeltaCommitsWhenPending) {
        int numDeltaCommits;
        HoodieActiveTimeline activeTimeline = metaClient.reloadActiveTimeline();
        Option lastCompaction = activeTimeline.filterCompletedInstants().filter(s -> s.getAction().equals("compaction")).lastInstant();
        int n = numDeltaCommits = lastCompaction.isPresent() ? activeTimeline.getDeltaCommitTimeline().findInstantsAfter(((HoodieInstant)lastCompaction.get()).getTimestamp()).countInstants() : activeTimeline.getDeltaCommitTimeline().countInstants();
        if (numDeltaCommits > maxNumDeltaCommitsWhenPending) {
            throw new HoodieMetadataException(String.format("Metadata table's deltacommits exceeded %d: this is likely caused by a pending instant in the data table. Resolve the pending instant or adjust `%s`, then restart the pipeline.", maxNumDeltaCommitsWhenPending, HoodieMetadataConfig.METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING.key()));
        }
    }

    private MetadataRecordsGenerationParams getRecordsGenerationParams() {
        return new MetadataRecordsGenerationParams(this.dataMetaClient, this.enabledPartitionTypes, this.dataWriteConfig.getBloomFilterType(), this.dataWriteConfig.getMetadataBloomFilterIndexParallelism(), this.dataWriteConfig.isMetadataColumnStatsIndexEnabled(), this.dataWriteConfig.getColumnStatsIndexParallelism(), this.dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), this.dataWriteConfig.getColumnsEnabledForBloomFilterIndex());
    }

    private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction) {
        Set<String> partitionsToUpdate = this.getMetadataPartitionsToUpdate();
        if (this.initialized && this.metadata != null) {
            Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata().entrySet().stream().filter(entry -> partitionsToUpdate.contains(((MetadataPartitionType)entry.getKey()).getPartitionPath())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            this.commit(instantTime, partitionRecordsMap);
        }
    }

    private Set<String> getMetadataPartitionsToUpdate() {
        Set partitionsToUpdate = this.dataMetaClient.getTableConfig().getMetadataPartitions();
        partitionsToUpdate.addAll(HoodieTableMetadataUtil.getInflightMetadataPartitions((HoodieTableConfig)this.dataMetaClient.getTableConfig()));
        if (!partitionsToUpdate.isEmpty()) {
            return partitionsToUpdate;
        }
        LOG.debug("There are no partitions to update according to table config. Falling back to enabled partition types in the write config.");
        return this.getEnabledPartitionTypes().stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
    }

    @Override
    public void buildMetadataPartitions(HoodieEngineContext engineContext, List<HoodieIndexPartitionInfo> indexPartitionInfos) throws IOException {
        if (indexPartitionInfos.isEmpty()) {
            LOG.warn("No partition to index in the plan");
            return;
        }
        String indexUptoInstantTime = indexPartitionInfos.get(0).getIndexUptoInstant();
        ArrayList<MetadataPartitionType> partitionTypes = new ArrayList<MetadataPartitionType>();
        indexPartitionInfos.forEach(indexPartitionInfo -> {
            String relativePartitionPath = indexPartitionInfo.getMetadataPartitionPath();
            LOG.info(String.format("Creating a new metadata index for partition '%s' under path %s upto instant %s", relativePartitionPath, this.metadataWriteConfig.getBasePath(), indexUptoInstantTime));
            MetadataPartitionType partitionType = MetadataPartitionType.valueOf((String)relativePartitionPath.toUpperCase(Locale.ROOT));
            if (!this.enabledPartitionTypes.contains(partitionType)) {
                throw new HoodieIndexException(String.format("Indexing for metadata partition: %s is not enabled", partitionType));
            }
            partitionTypes.add(partitionType);
        });
        this.dataMetaClient.getTableConfig().setMetadataPartitionsInflight(this.dataMetaClient, partitionTypes);
        this.initializeFromFilesystem(HoodieTableMetadataUtil.createAsyncIndexerTimestamp((String)indexUptoInstantTime), partitionTypes, (Option<String>)Option.empty());
    }

    @Override
    public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, HoodieData<WriteStatus> writeStatus, String instantTime) {
        this.processAndCommit(instantTime, () -> {
            Map partitionToRecordMap = HoodieTableMetadataUtil.convertMetadataToRecords((HoodieEngineContext)this.engineContext, (HoodieCommitMetadata)commitMetadata, (String)instantTime, (MetadataRecordsGenerationParams)this.getRecordsGenerationParams());
            HoodieData<HoodieRecord> updatesFromWriteStatuses = this.getRecordIndexUpserts(writeStatus);
            HoodieData<HoodieRecord> additionalUpdates = this.getRecordIndexAdditionalUpserts(updatesFromWriteStatuses, commitMetadata);
            partitionToRecordMap.put(MetadataPartitionType.RECORD_INDEX, updatesFromWriteStatuses.union(additionalUpdates));
            return partitionToRecordMap;
        });
        this.closeInternal();
    }

    @Override
    public void update(HoodieCommitMetadata commitMetadata, HoodieData<HoodieRecord> records, String instantTime) {
        this.processAndCommit(instantTime, () -> {
            Map partitionToRecordMap = HoodieTableMetadataUtil.convertMetadataToRecords((HoodieEngineContext)this.engineContext, (HoodieCommitMetadata)commitMetadata, (String)instantTime, (MetadataRecordsGenerationParams)this.getRecordsGenerationParams());
            HoodieData<HoodieRecord> additionalUpdates = this.getRecordIndexAdditionalUpserts(records, commitMetadata);
            partitionToRecordMap.put(MetadataPartitionType.RECORD_INDEX, records.union(additionalUpdates));
            return partitionToRecordMap;
        });
        this.closeInternal();
    }

    @Override
    public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
        this.processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords((HoodieEngineContext)this.engineContext, (HoodieCleanMetadata)cleanMetadata, (MetadataRecordsGenerationParams)this.getRecordsGenerationParams(), (String)instantTime));
        this.closeInternal();
    }

    @Override
    public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) {
        this.dataMetaClient.reloadActiveTimeline();
        HoodieInstant restoreInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, "restore", instantTime);
        HoodieInstant requested = HoodieTimeline.getRestoreRequestedInstant((HoodieInstant)restoreInstant);
        HoodieRestorePlan restorePlan = null;
        try {
            restorePlan = (HoodieRestorePlan)TimelineMetadataUtils.deserializeAvroMetadata((byte[])((byte[])this.dataMetaClient.getActiveTimeline().readRestoreInfoAsBytes(requested).get()), HoodieRestorePlan.class);
        }
        catch (IOException e) {
            throw new HoodieIOException("Deserialization of restore plan failed whose restore instant time is " + instantTime + " in data table", e);
        }
        String restoreToInstantTime = restorePlan.getSavepointToRestoreTimestamp();
        LOG.info("Triggering restore to " + restoreToInstantTime + " in metadata table");
        List filesGroups = this.metadata.getMetadataFileSystemView().getAllFileGroups(MetadataPartitionType.FILES.getPartitionPath()).collect(Collectors.toList());
        boolean cannotRestore = filesGroups.stream().map(fileGroup -> fileGroup.getAllFileSlices().map(fileSlice -> fileSlice.getBaseInstantTime()).anyMatch(instantTime1 -> HoodieTimeline.compareTimestamps((String)instantTime1, (BiPredicate)HoodieTimeline.LESSER_THAN_OR_EQUALS, (String)restoreToInstantTime))).anyMatch(canRestore -> canRestore == false);
        if (cannotRestore) {
            throw new HoodieMetadataException(String.format("Can't restore to %s since there is no base file in MDT lesser than the commit to restore to. Please delete metadata table and retry", restoreToInstantTime));
        }
        List<DirectoryInfo> dirInfoList = this.listAllPartitionsFromFilesystem(instantTime);
        Map<String, DirectoryInfo> dirInfoMap = dirInfoList.stream().collect(Collectors.toMap(DirectoryInfo::getRelativePath, Function.identity()));
        dirInfoList.clear();
        BaseHoodieWriteClient<?, I, ?, ?> writeClient = this.getWriteClient();
        writeClient.restoreToInstant(restoreToInstantTime, false);
        try {
            this.initMetadataReader();
            HashMap<String, Map<String, Long>> partitionFilesToAdd = new HashMap<String, Map<String, Long>>();
            HashMap<String, List<String>> partitionFilesToDelete = new HashMap<String, List<String>>();
            ArrayList<String> partitionsToDelete = new ArrayList<String>();
            this.fetchOutofSyncFilesRecordsFromMetadataTable(dirInfoMap, partitionFilesToAdd, partitionFilesToDelete, partitionsToDelete);
            String syncCommitTime = HoodieTableMetadataUtil.createRestoreTimestamp((String)HoodieActiveTimeline.createNewInstantTime());
            this.processAndCommit(syncCommitTime, () -> HoodieTableMetadataUtil.convertMissingPartitionRecords((HoodieEngineContext)this.engineContext, (List)partitionsToDelete, (Map)partitionFilesToAdd, (Map)partitionFilesToDelete, (String)syncCommitTime));
            this.closeInternal();
        }
        catch (IOException e) {
            throw new HoodieMetadataException("IOException during MDT restore sync", (Exception)e);
        }
    }

    @Override
    public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) {
        if (this.initialized && this.metadata != null) {
            String commitToRollbackInstantTime = (String)rollbackMetadata.getCommitsRollback().get(0);
            Option deltaCommitsInfo = CompactionUtils.getDeltaCommitsSinceLatestCompaction((HoodieActiveTimeline)this.metadataMetaClient.getActiveTimeline());
            HoodieInstant compactionInstant = (HoodieInstant)((Pair)deltaCommitsInfo.get()).getValue();
            HoodieTimeline deltacommitsSinceCompaction = (HoodieTimeline)((Pair)deltaCommitsInfo.get()).getKey();
            HoodieInstant deltaCommitInstant = new HoodieInstant(false, "deltacommit", commitToRollbackInstantTime);
            this.validateRollback(commitToRollbackInstantTime, compactionInstant, deltacommitsSinceCompaction);
            String rollbackInstantTime = HoodieTableMetadataUtil.createRollbackTimestamp((String)instantTime);
            this.processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords((HoodieEngineContext)this.engineContext, (HoodieTableMetaClient)this.dataMetaClient, (HoodieRollbackMetadata)rollbackMetadata, (String)instantTime));
            if (deltacommitsSinceCompaction.containsInstant(deltaCommitInstant)) {
                LOG.info("Rolling back MDT deltacommit " + commitToRollbackInstantTime);
                if (!this.getWriteClient().rollback(commitToRollbackInstantTime, rollbackInstantTime)) {
                    throw new HoodieMetadataException("Failed to rollback deltacommit at " + commitToRollbackInstantTime);
                }
            } else {
                LOG.info(String.format("Ignoring rollback of instant %s at %s. The commit to rollback is not found in MDT", commitToRollbackInstantTime, instantTime));
            }
            this.closeInternal();
        }
    }

    protected void validateRollback(String commitToRollbackInstantTime, HoodieInstant compactionInstant, HoodieTimeline deltacommitsSinceCompaction) {
        String compactionInstantTime;
        if (compactionInstant.getAction().equals("commit") && HoodieTimeline.LESSER_THAN_OR_EQUALS.test(commitToRollbackInstantTime, compactionInstantTime = compactionInstant.getTimestamp())) {
            throw new HoodieMetadataException(String.format("Commit being rolled back %s is earlier than the latest compaction %s. There are %d deltacommits after this compaction: %s", commitToRollbackInstantTime, compactionInstantTime, deltacommitsSinceCompaction.countInstants(), deltacommitsSinceCompaction.getInstants()));
        }
    }

    @Override
    public void close() throws Exception {
        if (this.metadata != null) {
            this.metadata.close();
        }
        if (this.writeClient != null) {
            this.writeClient.close();
            this.writeClient = null;
        }
    }

    protected abstract void commit(String var1, Map<MetadataPartitionType, HoodieData<HoodieRecord>> var2);

    protected abstract I convertHoodieDataToEngineSpecificData(HoodieData<HoodieRecord> var1);

    protected void commitInternal(String instantTime, Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing, Option<BulkInsertPartitioner> bulkInsertPartitioner) {
        ValidationUtils.checkState((this.metadataMetaClient != null ? 1 : 0) != 0, (String)"Metadata table is not fully initialized yet.");
        HoodieData<HoodieRecord> preppedRecords = this.prepRecords(partitionRecordsMap);
        I preppedRecordInputs = this.convertHoodieDataToEngineSpecificData(preppedRecords);
        BaseHoodieWriteClient<?, I, ?, ?> writeClient = this.getWriteClient();
        if (this.dataWriteConfig.getFailedWritesCleanPolicy().isEager() && writeClient.rollbackFailedWrites()) {
            this.metadataMetaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metadataMetaClient);
        }
        if (!this.metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime)) {
            LOG.info("New commit at " + instantTime + " being applied to MDT.");
        } else {
            Option alreadyCompletedInstant = this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant();
            LOG.info(String.format("%s completed commit at %s being applied to MDT.", alreadyCompletedInstant.isPresent() ? "Already" : "Partially", instantTime));
            if (!writeClient.rollback(instantTime)) {
                throw new HoodieMetadataException("Failed to rollback deltacommit at " + instantTime + " from MDT");
            }
            this.metadataMetaClient.reloadActiveTimeline();
        }
        writeClient.startCommitWithTime(instantTime);
        this.preWrite(instantTime);
        if (isInitializing) {
            this.engineContext.setJobStatus(this.getClass().getSimpleName(), String.format("Bulk inserting at %s into metadata table %s", instantTime, this.metadataWriteConfig.getTableName()));
            writeClient.bulkInsertPreppedRecords(preppedRecordInputs, instantTime, bulkInsertPartitioner);
        } else {
            this.engineContext.setJobStatus(this.getClass().getSimpleName(), String.format("Upserting at %s into metadata table %s", instantTime, this.metadataWriteConfig.getTableName()));
            writeClient.upsertPreppedRecords(preppedRecordInputs, instantTime);
        }
        this.metadataMetaClient.reloadActiveTimeline();
        this.metrics.ifPresent(m -> m.updateSizeMetrics(this.metadataMetaClient, this.metadata, this.dataMetaClient.getTableConfig().getMetadataPartitions()));
    }

    protected void preWrite(String instantTime) {
    }

    protected abstract void bulkCommit(String var1, MetadataPartitionType var2, HoodieData<HoodieRecord> var3, int var4);

    protected HoodieData<HoodieRecord> prepRecords(Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap) {
        HoodieData allPartitionRecords = this.engineContext.emptyHoodieData();
        HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemView((HoodieTableMetaClient)this.metadataMetaClient);
        for (Map.Entry<MetadataPartitionType, HoodieData<HoodieRecord>> entry : partitionRecordsMap.entrySet()) {
            int fileGroupCount;
            String partitionName = entry.getKey().getPartitionPath();
            HoodieData<HoodieRecord> records = entry.getValue();
            List fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices((HoodieTableMetaClient)this.metadataMetaClient, (Option)Option.ofNullable((Object)fsView), (String)partitionName);
            if (fileSlices.isEmpty()) {
                fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight((HoodieTableMetaClient)this.metadataMetaClient, (Option)Option.ofNullable((Object)fsView), (String)partitionName);
            }
            ValidationUtils.checkArgument(((fileGroupCount = fileSlices.size()) > 0 ? 1 : 0) != 0, (String)("FileGroup count for MDT partition " + partitionName + " should be >0"));
            List finalFileSlices = fileSlices;
            HoodieData rddSinglePartitionRecords = records.map((SerializableFunction & Serializable)r -> {
                FileSlice slice = (FileSlice)finalFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex((String)r.getRecordKey(), (int)fileGroupCount));
                r.unseal();
                r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
                r.seal();
                return r;
            });
            allPartitionRecords = allPartitionRecords.union(rddSinglePartitionRecords);
        }
        return allPartitionRecords;
    }

    @Override
    public void performTableServices(Option<String> inFlightInstantTimestamp) {
        HoodieTimer metadataTableServicesTimer = HoodieTimer.start();
        boolean allTableServicesExecutedSuccessfullyOrSkipped = true;
        BaseHoodieWriteClient<?, I, ?, ?> writeClient = this.getWriteClient();
        try {
            this.runPendingTableServicesOperations(writeClient);
            Option lastInstant = this.metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
            if (!lastInstant.isPresent()) {
                return;
            }
            String latestDeltacommitTime = ((HoodieInstant)lastInstant.get()).getTimestamp();
            LOG.info("Latest deltacommit time found is " + latestDeltacommitTime + ", running clean operations.");
            this.cleanIfNecessary(writeClient, latestDeltacommitTime);
            if (this.validateTimelineBeforeSchedulingCompaction(inFlightInstantTimestamp, latestDeltacommitTime)) {
                this.compactIfNecessary(writeClient, latestDeltacommitTime);
            }
            writeClient.archive();
            LOG.info("All the table services operations on MDT completed successfully");
        }
        catch (Exception e) {
            LOG.error("Exception in running table services on metadata table", (Throwable)e);
            allTableServicesExecutedSuccessfullyOrSkipped = false;
            throw e;
        }
        finally {
            long timeSpent = metadataTableServicesTimer.endTimer();
            this.metrics.ifPresent(m -> m.updateMetrics("table_service_execution_duration", timeSpent));
            if (allTableServicesExecutedSuccessfullyOrSkipped) {
                this.metrics.ifPresent(m -> m.incrementMetric("table_service_execution_status", 1L));
            } else {
                this.metrics.ifPresent(m -> m.incrementMetric("table_service_execution_status", -1L));
            }
        }
    }

    private void runPendingTableServicesOperations(BaseHoodieWriteClient writeClient) {
        writeClient.runAnyPendingCompactions();
        writeClient.runAnyPendingLogCompactions();
    }

    protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String latestDeltacommitTime) {
        String compactionInstantTime = HoodieTableMetadataUtil.createCompactionTimestamp((String)latestDeltacommitTime);
        if (this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)) {
            LOG.info(String.format("Compaction with same %s time is already present in the timeline.", compactionInstantTime));
        } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, (Option<Map<String, String>>)Option.empty())) {
            LOG.info("Compaction is scheduled for timestamp " + compactionInstantTime);
            writeClient.compact(compactionInstantTime);
        } else if (this.metadataWriteConfig.isLogCompactionEnabled()) {
            String logCompactionInstantTime = HoodieTableMetadataUtil.createLogCompactionTimestamp((String)latestDeltacommitTime);
            if (this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(logCompactionInstantTime)) {
                LOG.info(String.format("Log compaction with same %s time is already present in the timeline.", logCompactionInstantTime));
            } else if (writeClient.scheduleLogCompactionAtInstant(logCompactionInstantTime, (Option<Map<String, String>>)Option.empty())) {
                LOG.info("Log compaction is scheduled for timestamp " + logCompactionInstantTime);
                writeClient.logCompact(logCompactionInstantTime);
            }
        }
    }

    protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instantTime) {
        Option lastCompletedCompactionInstant = this.metadataMetaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
        if (lastCompletedCompactionInstant.isPresent() && this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().findInstantsAfter(((HoodieInstant)lastCompletedCompactionInstant.get()).getTimestamp()).countInstants() < 3) {
            return;
        }
        writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp((String)instantTime));
        writeClient.lazyRollbackFailedIndexing();
    }

    protected boolean validateTimelineBeforeSchedulingCompaction(Option<String> inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
        List pendingInstants = this.dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested().findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
        if (!pendingInstants.isEmpty()) {
            HoodieBackedTableMetadataWriter.checkNumDeltaCommits(this.metadataMetaClient, this.dataWriteConfig.getMetadataConfig().getMaxNumDeltacommitsWhenPending());
            LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants in data table before latest deltacommit in metadata table: %s. Inflight instants in data table: %s", pendingInstants.size(), latestDeltaCommitTimeInMetadataTable, Arrays.toString(pendingInstants.toArray())));
            return false;
        }
        Option pendingLogCompactionInstant = this.metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
        Option pendingCompactionInstant = this.metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
        if (pendingLogCompactionInstant.isPresent() || pendingCompactionInstant.isPresent()) {
            LOG.warn(String.format("Not scheduling compaction or logCompaction, since a pending compaction instant %s or logCompaction %s instant is present", pendingCompactionInstant, pendingLogCompactionInstant));
            return false;
        }
        return true;
    }

    private void fetchOutofSyncFilesRecordsFromMetadataTable(Map<String, DirectoryInfo> dirInfoMap, Map<String, Map<String, Long>> partitionFilesToAdd, Map<String, List<String>> partitionFilesToDelete, List<String> partitionsToDelete) throws IOException {
        for (String partition : this.metadata.fetchAllPartitionPaths()) {
            Path partitionPath = null;
            partitionPath = StringUtils.isNullOrEmpty((String)partition) && !this.dataMetaClient.getTableConfig().isTablePartitioned() ? new Path(this.dataWriteConfig.getBasePath()) : new Path(this.dataWriteConfig.getBasePath(), partition);
            String partitionId = HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition((String)partition);
            FileStatus[] metadataFiles = this.metadata.getAllFilesInPartition(partitionPath);
            if (!dirInfoMap.containsKey(partition)) {
                partitionsToDelete.add(partitionId);
                if (metadataFiles == null || metadataFiles.length <= 0) continue;
                partitionFilesToDelete.put(partitionId, Arrays.stream(metadataFiles).map(f -> f.getPath().getName()).collect(Collectors.toList()));
                continue;
            }
            Map<String, Long> fsFiles = dirInfoMap.get(partition).getFileNameToSizeMap();
            List mdtFiles = Arrays.stream(metadataFiles).map(mdtFile -> mdtFile.getPath().getName()).collect(Collectors.toList());
            List filesDeleted = Arrays.stream(metadataFiles).map(f -> f.getPath().getName()).filter(n -> !fsFiles.containsKey(n)).collect(Collectors.toList());
            HashMap filesToAdd = new HashMap();
            dirInfoMap.get(partition).getFileNameToSizeMap().forEach((k, v) -> {
                if (!mdtFiles.contains(k)) {
                    filesToAdd.put(k, v);
                }
            });
            if (!filesToAdd.isEmpty()) {
                partitionFilesToAdd.put(partitionId, filesToAdd);
            }
            if (filesDeleted.isEmpty()) continue;
            partitionFilesToDelete.put(partitionId, filesDeleted);
        }
    }

    private HoodieData<HoodieRecord> getRecordIndexUpserts(HoodieData<WriteStatus> writeStatuses) {
        return writeStatuses.flatMap((SerializableFunction & Serializable)writeStatus -> {
            LinkedList<HoodieRecord> recordList = new LinkedList<HoodieRecord>();
            for (HoodieRecordDelegate recordDelegate : writeStatus.getWrittenRecordDelegates()) {
                HoodieRecord hoodieRecord;
                if (writeStatus.isErrored(recordDelegate.getHoodieKey()) || recordDelegate.getIgnoreIndexUpdate()) continue;
                Option newLocation = recordDelegate.getNewLocation();
                if (newLocation.isPresent()) {
                    if (recordDelegate.getCurrentLocation().isPresent()) {
                        if (((HoodieRecordLocation)recordDelegate.getCurrentLocation().get()).getFileId().equals(((HoodieRecordLocation)newLocation.get()).getFileId())) continue;
                        String msg = String.format("Detected update in location of record with key %s from %s  to %s. The fileID should not change.", recordDelegate, recordDelegate.getCurrentLocation().get(), newLocation.get());
                        LOG.error(msg);
                        throw new HoodieMetadataException(msg);
                    }
                    hoodieRecord = HoodieMetadataPayload.createRecordIndexUpdate((String)recordDelegate.getRecordKey(), (String)recordDelegate.getPartitionPath(), (String)((HoodieRecordLocation)newLocation.get()).getFileId(), (String)((HoodieRecordLocation)newLocation.get()).getInstantTime(), (int)this.dataWriteConfig.getWritesFileIdEncoding());
                    recordList.add(hoodieRecord);
                    continue;
                }
                hoodieRecord = HoodieMetadataPayload.createRecordIndexDelete((String)recordDelegate.getRecordKey());
                recordList.add(hoodieRecord);
            }
            return recordList.iterator();
        });
    }

    private HoodieData<HoodieRecord> getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata replaceCommitMetadata) {
        HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(this.dataMetaClient, (HoodieTimeline)this.dataMetaClient.getActiveTimeline(), (HoodieTableMetadata)this.metadata);
        List partitionBaseFilePairs = replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream().flatMap(partition -> fsView.getLatestBaseFiles(partition).map(f -> Pair.of((Object)partition, (Object)f))).collect(Collectors.toList());
        return HoodieTableMetadataUtil.readRecordKeysFromBaseFiles((HoodieEngineContext)this.engineContext, partitionBaseFilePairs, (boolean)true, (int)this.dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(), (String)this.dataWriteConfig.getBasePath(), (SerializableConfiguration)this.hadoopConf, (String)this.getClass().getSimpleName());
    }

    private HoodieData<HoodieRecord> getRecordIndexAdditionalUpserts(HoodieData<HoodieRecord> updatesFromWriteStatuses, HoodieCommitMetadata commitMetadata) {
        WriteOperationType operationType = commitMetadata.getOperationType();
        if (operationType == WriteOperationType.INSERT_OVERWRITE) {
            return this.getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata)commitMetadata).mapToPair((SerializablePairFunction & Serializable)r -> Pair.of((Object)r.getKey(), (Object)r)).leftOuterJoin(updatesFromWriteStatuses.mapToPair((SerializablePairFunction & Serializable)r -> Pair.of((Object)r.getKey(), (Object)r))).values().filter((SerializableFunction & Serializable)p -> !((Option)p.getRight()).isPresent()).map(Pair::getLeft);
        }
        if (operationType == WriteOperationType.INSERT_OVERWRITE_TABLE) {
            return this.getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata)commitMetadata).mapToPair((SerializablePairFunction & Serializable)r -> Pair.of((Object)r.getRecordKey(), (Object)r)).leftOuterJoin(updatesFromWriteStatuses.mapToPair((SerializablePairFunction & Serializable)r -> Pair.of((Object)r.getRecordKey(), (Object)r))).values().filter((SerializableFunction & Serializable)p -> !((Option)p.getRight()).isPresent()).map(Pair::getLeft);
        }
        if (operationType == WriteOperationType.DELETE_PARTITION) {
            return this.getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata)commitMetadata);
        }
        return this.engineContext.emptyHoodieData();
    }

    protected void closeInternal() {
        try {
            this.close();
        }
        catch (Exception e) {
            throw new HoodieException("Failed to close HoodieMetadata writer ", (Throwable)e);
        }
    }

    @Override
    public boolean isInitialized() {
        return this.initialized;
    }

    protected BaseHoodieWriteClient<?, I, ?, ?> getWriteClient() {
        if (this.writeClient == null) {
            this.writeClient = this.initializeWriteClient();
        }
        return this.writeClient;
    }

    protected abstract BaseHoodieWriteClient<?, I, ?, ?> initializeWriteClient();

    static class DirectoryInfo
    implements Serializable {
        private final String relativePath;
        private final HashMap<String, Long> filenameToSizeMap;
        private final List<Path> subDirectories = new ArrayList<Path>();
        private boolean isHoodiePartition = false;

        public DirectoryInfo(String relativePath, FileStatus[] fileStatus, String maxInstantTime) {
            this.relativePath = relativePath;
            this.filenameToSizeMap = new HashMap(fileStatus.length);
            for (FileStatus status : fileStatus) {
                String dataFileCommitTime;
                if (status.isDirectory()) {
                    if (status.getPath().getName().equals(".hoodie")) continue;
                    this.subDirectories.add(status.getPath());
                    continue;
                }
                if (status.getPath().getName().startsWith(".hoodie_partition_metadata")) {
                    this.isHoodiePartition = true;
                    continue;
                }
                if (!FSUtils.isDataFile((Path)status.getPath()) || !HoodieTimeline.compareTimestamps((String)(dataFileCommitTime = FSUtils.getCommitTime((String)status.getPath().getName())), (BiPredicate)HoodieTimeline.LESSER_THAN_OR_EQUALS, (String)maxInstantTime)) continue;
                this.filenameToSizeMap.put(status.getPath().getName(), status.getLen());
            }
        }

        String getRelativePath() {
            return this.relativePath;
        }

        int getTotalFiles() {
            return this.filenameToSizeMap.size();
        }

        boolean isHoodiePartition() {
            return this.isHoodiePartition;
        }

        List<Path> getSubDirectories() {
            return this.subDirectories;
        }

        Map<String, Long> getFileNameToSizeMap() {
            return this.filenameToSizeMap;
        }
    }

    private static interface ConvertMetadataFunction {
        public Map<MetadataPartitionType, HoodieData<HoodieRecord>> convertMetadata();
    }
}

