/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.metadata;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.Schema;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieIndexMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordDelegate;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.io.HoodieMergedReadHandle;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableMetadataWriter {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieBackedTableMetadataWriter.class);
    private static final String RECORD_KEY_FIELD_NAME = "key";
    private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48;
    private transient BaseHoodieWriteClient<?, I, ?, ?> writeClient;
    protected HoodieWriteConfig metadataWriteConfig;
    protected HoodieWriteConfig dataWriteConfig;
    protected HoodieBackedTableMetadata metadata;
    protected HoodieTableMetaClient metadataMetaClient;
    protected HoodieTableMetaClient dataMetaClient;
    protected Option<HoodieMetadataMetrics> metrics;
    protected StorageConfiguration<?> storageConf;
    protected final transient HoodieEngineContext engineContext;
    protected final List<MetadataPartitionType> enabledPartitionTypes;
    private boolean initialized = false;

    protected HoodieBackedTableMetadataWriter(StorageConfiguration<?> storageConf, HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, HoodieEngineContext engineContext, Option<String> inflightInstantTimestamp) {
        this.dataWriteConfig = writeConfig;
        this.engineContext = engineContext;
        this.storageConf = storageConf;
        this.metrics = Option.empty();
        this.dataMetaClient = HoodieTableMetaClient.builder().setConf(storageConf.newInstance()).setBasePath(this.dataWriteConfig.getBasePath()).setTimeGeneratorConfig(this.dataWriteConfig.getTimeGeneratorConfig()).build();
        this.enabledPartitionTypes = MetadataPartitionType.getEnabledPartitions((TypedProperties)this.dataWriteConfig.getProps(), (HoodieTableMetaClient)this.dataMetaClient);
        if (writeConfig.isMetadataTableEnabled()) {
            this.metadataWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig, failedWritesCleaningPolicy);
            try {
                this.initRegistry();
                this.initialized = this.initializeIfNeeded(this.dataMetaClient, inflightInstantTimestamp);
            }
            catch (IOException e) {
                LOG.error("Failed to initialize metadata table", (Throwable)e);
            }
        }
        ValidationUtils.checkArgument((!this.initialized || this.metadata != null ? 1 : 0) != 0, (String)"MDT Reader should have been opened post initialization");
    }

    protected HoodieTable getHoodieTable(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) {
        return null;
    }

    private void initMetadataReader() {
        if (this.metadata != null) {
            this.metadata.close();
        }
        try {
            this.metadata = new HoodieBackedTableMetadata(this.engineContext, this.dataMetaClient.getStorage(), this.dataWriteConfig.getMetadataConfig(), this.dataWriteConfig.getBasePath(), true);
            this.metadataMetaClient = this.metadata.getMetadataMetaClient();
        }
        catch (Exception e) {
            throw new HoodieException("Could not open MDT for reads", (Throwable)e);
        }
    }

    protected abstract void initRegistry();

    public HoodieWriteConfig getWriteConfig() {
        return this.metadataWriteConfig;
    }

    public HoodieBackedTableMetadata getTableMetadata() {
        return this.metadata;
    }

    public List<MetadataPartitionType> getEnabledPartitionTypes() {
        return this.enabledPartitionTypes;
    }

    protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient, Option<String> inflightInstantTimestamp) throws IOException {
        HoodieTimer timer = HoodieTimer.start();
        ArrayList<MetadataPartitionType> partitionsToInit = new ArrayList<MetadataPartitionType>(MetadataPartitionType.values().length);
        try {
            boolean exists = this.metadataTableExists(dataMetaClient);
            if (!exists) {
                partitionsToInit.add(MetadataPartitionType.FILES);
            }
            if (!this.dataWriteConfig.isMetadataAsyncIndex()) {
                Set completedPartitions = dataMetaClient.getTableConfig().getMetadataPartitions();
                LOG.info("Async metadata indexing disabled and following partitions already initialized: {}", (Object)completedPartitions);
                this.enabledPartitionTypes.stream().filter(p -> !completedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p)).forEach(partitionsToInit::add);
            }
            if (partitionsToInit.isEmpty()) {
                this.initMetadataReader();
                return true;
            }
            String initializationTime = (String)dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse((Object)"00000000000000");
            if (!this.initializeFromFilesystem(initializationTime, partitionsToInit, inflightInstantTimestamp)) {
                LOG.error("Failed to initialize MDT from filesystem");
                return false;
            }
            this.metrics.ifPresent(m -> m.updateMetrics("initialize", timer.endTimer()));
            return true;
        }
        catch (IOException e) {
            LOG.error("Failed to initialize metadata table. Disabling the writer.", (Throwable)e);
            return false;
        }
    }

    private boolean metadataTableExists(HoodieTableMetaClient dataMetaClient) throws IOException {
        boolean exists = dataMetaClient.getTableConfig().isMetadataTableAvailable();
        boolean reInitialize = false;
        if (exists) {
            try {
                this.metadataMetaClient = HoodieTableMetaClient.builder().setConf(this.storageConf.newInstance()).setBasePath(this.metadataWriteConfig.getBasePath()).setTimeGeneratorConfig(this.dataWriteConfig.getTimeGeneratorConfig()).build();
                if (this.metadataMetaClient.getTableConfig().populateMetaFields()) {
                    LOG.info("Re-initiating metadata table properties since populate meta fields have changed");
                    this.metadataMetaClient = this.initializeMetaClient();
                }
            }
            catch (TableNotFoundException e) {
                return false;
            }
            Option latestMetadataInstant = this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
            reInitialize = this.isBootstrapNeeded((Option<HoodieInstant>)latestMetadataInstant);
        }
        if (reInitialize) {
            this.metrics.ifPresent(m -> m.incrementMetric("rebootstrap_count", 1L));
            LOG.info("Deleting Metadata Table directory so that it can be re-initialized");
            HoodieTableMetadataUtil.deleteMetadataTable((HoodieTableMetaClient)dataMetaClient, (HoodieEngineContext)this.engineContext, (boolean)false);
            exists = false;
        }
        return exists;
    }

    private boolean isBootstrapNeeded(Option<HoodieInstant> latestMetadataInstant) {
        if (!latestMetadataInstant.isPresent()) {
            LOG.warn("Metadata Table will need to be re-initialized as no instants were found");
            return true;
        }
        return false;
    }

    private boolean initializeFromFilesystem(String initializationTime, List<MetadataPartitionType> partitionsToInit, Option<String> inflightInstantTimestamp) throws IOException {
        Set<String> pendingDataInstants = this.getPendingDataInstants(this.dataMetaClient);
        boolean filesPartitionAvailable = this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES);
        if (!filesPartitionAvailable) {
            partitionsToInit.remove(MetadataPartitionType.FILES);
            partitionsToInit.add(0, MetadataPartitionType.FILES);
            this.metadataMetaClient = this.initializeMetaClient();
        } else {
            this.initMetadataReader();
            if (this.metadataMetaClient == null) {
                this.metadataMetaClient = HoodieTableMetaClient.builder().setConf(this.storageConf.newInstance()).setBasePath(this.metadataWriteConfig.getBasePath()).setTimeGeneratorConfig(this.dataWriteConfig.getTimeGeneratorConfig()).build();
            }
        }
        partitionsToInit.removeIf(metadataPartition -> this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(metadataPartition));
        List<Object> partitionInfoList = filesPartitionAvailable ? this.listAllPartitionsFromMDT(initializationTime, pendingDataInstants) : (this.dataWriteConfig.getMetadataConfig().shouldAutoInitialize() ? this.listAllPartitionsFromFilesystem(initializationTime, pendingDataInstants) : Collections.emptyList());
        Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream().map(p -> {
            String partitionName = HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition((String)p.getRelativePath());
            return Pair.of((Object)partitionName, (Object)p.getFileNameToSizeMap());
        }).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        block11: for (MetadataPartitionType partitionType : partitionsToInit) {
            Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
            String commitTimeForPartition = this.generateUniqueCommitInstantTime(initializationTime);
            String partitionTypeName = partitionType.name();
            LOG.info("Initializing MDT partition {} at instant {}", (Object)partitionTypeName, (Object)commitTimeForPartition);
            try {
                switch (partitionType) {
                    case FILES: {
                        fileGroupCountAndRecordsPair = this.initializeFilesPartition(partitionInfoList);
                        break;
                    }
                    case BLOOM_FILTERS: {
                        fileGroupCountAndRecordsPair = this.initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
                        break;
                    }
                    case COLUMN_STATS: {
                        fileGroupCountAndRecordsPair = this.initializeColumnStatsPartition(partitionToFilesMap);
                        break;
                    }
                    case RECORD_INDEX: {
                        fileGroupCountAndRecordsPair = this.initializeRecordIndexPartition();
                        break;
                    }
                    case FUNCTIONAL_INDEX: {
                        Set<String> functionalIndexPartitionsToInit = this.getFunctionalIndexPartitionsToInit();
                        if (functionalIndexPartitionsToInit.isEmpty()) continue block11;
                        ValidationUtils.checkState((functionalIndexPartitionsToInit.size() == 1 ? 1 : 0) != 0, (String)"Only one functional index at a time is supported for now");
                        fileGroupCountAndRecordsPair = this.initializeFunctionalIndexPartition(functionalIndexPartitionsToInit.iterator().next());
                        break;
                    }
                    case PARTITION_STATS: {
                        if (this.dataWriteConfig.getColumnsEnabledForColumnStatsIndex().isEmpty()) {
                            LOG.warn("Skipping partition stats index initialization as target columns are not set");
                            continue block11;
                        }
                        fileGroupCountAndRecordsPair = this.initializePartitionStatsIndex(partitionInfoList);
                        break;
                    }
                    case SECONDARY_INDEX: {
                        Set<String> secondaryIndexPartitionsToInit = this.getSecondaryIndexPartitionsToInit();
                        if (secondaryIndexPartitionsToInit.size() != 1) {
                            LOG.warn("Skipping secondary index initialization as only one secondary index bootstrap at a time is supported for now. Provided: {}", secondaryIndexPartitionsToInit);
                            continue block11;
                        }
                        fileGroupCountAndRecordsPair = this.initializeSecondaryIndexPartition(secondaryIndexPartitionsToInit.iterator().next());
                        break;
                    }
                    default: {
                        throw new HoodieMetadataException(String.format("Unsupported MDT partition type: %s", partitionType));
                    }
                }
            }
            catch (Exception e) {
                String metricKey = partitionType.getPartitionPath() + "_" + "bootstrap_error";
                this.metrics.ifPresent(m -> m.setMetric(metricKey, 1L));
                String errMsg = String.format("Bootstrap on %s partition failed for %s", partitionType.getPartitionPath(), this.metadataMetaClient.getBasePath());
                LOG.error(errMsg, (Throwable)e);
                throw new HoodieMetadataException(errMsg, e);
            }
            if (LOG.isInfoEnabled()) {
                LOG.info("Initializing {} index with {} mappings and {} file groups.", new Object[]{partitionTypeName, fileGroupCountAndRecordsPair.getKey(), ((HoodieData)fileGroupCountAndRecordsPair.getValue()).count()});
            }
            HoodieTimer partitionInitTimer = HoodieTimer.start();
            int fileGroupCount = (Integer)fileGroupCountAndRecordsPair.getKey();
            ValidationUtils.checkArgument((fileGroupCount > 0 ? 1 : 0) != 0, (String)("FileGroup count for MDT partition " + partitionTypeName + " should be > 0"));
            this.initializeFileGroups(this.dataMetaClient, partitionType, commitTimeForPartition, fileGroupCount);
            HoodieData records = (HoodieData)fileGroupCountAndRecordsPair.getValue();
            this.bulkCommit(commitTimeForPartition, partitionType, (HoodieData<HoodieRecord>)records, fileGroupCount);
            this.metadataMetaClient.reloadActiveTimeline();
            String partitionPath = partitionType == MetadataPartitionType.FUNCTIONAL_INDEX || partitionType == MetadataPartitionType.SECONDARY_INDEX ? this.dataWriteConfig.getIndexingConfig().getIndexName() : partitionType.getPartitionPath();
            this.dataMetaClient.getTableConfig().setMetadataPartitionState(this.dataMetaClient, partitionPath, true);
            this.initMetadataReader();
            long totalInitTime = partitionInitTimer.endTimer();
            LOG.info("Initializing {} index in metadata table took {} in ms", (Object)partitionTypeName, (Object)totalInitTime);
        }
        return true;
    }

    private String generateUniqueCommitInstantTime(String initializationTime) {
        HoodieTimeline dataIndexTimeline = this.dataMetaClient.getActiveTimeline().filter(instant -> instant.getAction().equals("indexing"));
        if (HoodieTableMetadataUtil.isIndexingCommit((HoodieTimeline)dataIndexTimeline, (String)initializationTime)) {
            return initializationTime;
        }
        int offset = 0;
        while (true) {
            String commitInstantTime = HoodieInstantTimeGenerator.instantTimePlusMillis((String)"00000000000000", (long)offset);
            if (!this.metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
                return commitInstantTime;
            }
            ++offset;
        }
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializePartitionStatsIndex(List<HoodieTableMetadataUtil.DirectoryInfo> partitionInfoList) {
        HoodieData records = HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords((HoodieEngineContext)this.engineContext, partitionInfoList, (HoodieMetadataConfig)this.dataWriteConfig.getMetadataConfig(), (HoodieTableMetaClient)this.dataMetaClient);
        int fileGroupCount = this.dataWriteConfig.getMetadataConfig().getPartitionStatsIndexFileGroupCount();
        return Pair.of((Object)fileGroupCount, (Object)records);
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializeColumnStatsPartition(Map<String, Map<String, Long>> partitionToFilesMap) {
        HoodieData records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords((HoodieEngineContext)this.engineContext, Collections.emptyMap(), partitionToFilesMap, (HoodieTableMetaClient)this.dataMetaClient, (boolean)this.dataWriteConfig.isMetadataColumnStatsIndexEnabled(), (int)this.dataWriteConfig.getColumnStatsIndexParallelism(), this.dataWriteConfig.getColumnsEnabledForColumnStatsIndex());
        int fileGroupCount = this.dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
        return Pair.of((Object)fileGroupCount, (Object)records);
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializeBloomFiltersPartition(String createInstantTime, Map<String, Map<String, Long>> partitionToFilesMap) {
        HoodieData records = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords((HoodieEngineContext)this.engineContext, Collections.emptyMap(), partitionToFilesMap, (String)createInstantTime, (HoodieTableMetaClient)this.dataMetaClient, (int)this.dataWriteConfig.getBloomIndexParallelism(), (String)this.dataWriteConfig.getBloomFilterType());
        int fileGroupCount = this.dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount();
        return Pair.of((Object)fileGroupCount, (Object)records);
    }

    protected abstract HoodieData<HoodieRecord> getFunctionalIndexRecords(List<Pair<String, FileSlice>> var1, HoodieIndexDefinition var2, HoodieTableMetaClient var3, int var4, Schema var5, StorageConfiguration<?> var6);

    protected abstract EngineType getEngineType();

    public abstract HoodieData<HoodieRecord> getDeletedSecondaryRecordMapping(HoodieEngineContext var1, Map<String, String> var2, HoodieIndexDefinition var3);

    private Pair<Integer, HoodieData<HoodieRecord>> initializeFunctionalIndexPartition(String indexName) throws Exception {
        HoodieIndexDefinition indexDefinition = this.getFunctionalIndexDefinition(indexName);
        ValidationUtils.checkState((indexDefinition != null ? 1 : 0) != 0, (String)("Functional Index definition is not present for index " + indexName));
        List<Pair<String, FileSlice>> partitionFileSlicePairs = this.getPartitionFileSlicePairs();
        int fileGroupCount = this.dataWriteConfig.getMetadataConfig().getFunctionalIndexFileGroupCount();
        int parallelism = Math.min(partitionFileSlicePairs.size(), this.dataWriteConfig.getMetadataConfig().getFunctionalIndexParallelism());
        Schema readerSchema = HoodieTableMetadataUtil.getProjectedSchemaForFunctionalIndex((HoodieIndexDefinition)indexDefinition, (HoodieTableMetaClient)this.dataMetaClient);
        return Pair.of((Object)fileGroupCount, this.getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, this.dataMetaClient, parallelism, readerSchema, this.storageConf));
    }

    private Set<String> getFunctionalIndexPartitionsToInit() {
        Set<String> functionalIndexPartitions = ((HoodieIndexMetadata)this.dataMetaClient.getIndexMetadata().get()).getIndexDefinitions().keySet();
        Set completedMetadataPartitions = this.dataMetaClient.getTableConfig().getMetadataPartitions();
        functionalIndexPartitions.removeAll(completedMetadataPartitions);
        return functionalIndexPartitions;
    }

    private HoodieIndexDefinition getFunctionalIndexDefinition(String indexName) {
        Option functionalIndexMetadata = this.dataMetaClient.getIndexMetadata();
        if (functionalIndexMetadata.isPresent()) {
            return (HoodieIndexDefinition)((HoodieIndexMetadata)functionalIndexMetadata.get()).getIndexDefinitions().get(indexName);
        }
        throw new HoodieIndexException("Functional Index definition is not present");
    }

    private Set<String> getSecondaryIndexPartitionsToInit() {
        Set<String> secondaryIndexPartitions = ((HoodieIndexMetadata)this.dataMetaClient.getIndexMetadata().get()).getIndexDefinitions().values().stream().map(HoodieIndexDefinition::getIndexName).filter(indexName -> indexName.startsWith("secondary_index_")).collect(Collectors.toSet());
        Set completedMetadataPartitions = this.dataMetaClient.getTableConfig().getMetadataPartitions();
        secondaryIndexPartitions.removeAll(completedMetadataPartitions);
        return secondaryIndexPartitions;
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializeSecondaryIndexPartition(String indexName) throws IOException {
        HoodieIndexDefinition indexDefinition = this.getFunctionalIndexDefinition(indexName);
        ValidationUtils.checkState((indexDefinition != null ? 1 : 0) != 0, (String)("Secondary Index definition is not present for index " + indexName));
        List<Pair<String, FileSlice>> partitionFileSlicePairs = this.getPartitionFileSlicePairs();
        int parallelism = Math.min(partitionFileSlicePairs.size(), this.dataWriteConfig.getMetadataConfig().getSecondaryIndexParallelism());
        HoodieData records = HoodieTableMetadataUtil.readSecondaryKeysFromFileSlices((HoodieEngineContext)this.engineContext, partitionFileSlicePairs, (int)parallelism, (String)this.getClass().getSimpleName(), (HoodieTableMetaClient)this.dataMetaClient, (EngineType)this.getEngineType(), (HoodieIndexDefinition)indexDefinition);
        int fileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount((MetadataPartitionType)MetadataPartitionType.RECORD_INDEX, (long)records.count(), (int)48, (int)this.dataWriteConfig.getRecordIndexMinFileGroupCount(), (int)this.dataWriteConfig.getRecordIndexMaxFileGroupCount(), (float)this.dataWriteConfig.getRecordIndexGrowthFactor(), (int)this.dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes());
        return Pair.of((Object)fileGroupCount, (Object)records);
    }

    private List<Pair<String, FileSlice>> getPartitionFileSlicePairs() throws IOException {
        HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(this.dataMetaClient, (HoodieTimeline)this.dataMetaClient.getActiveTimeline(), (HoodieTableMetadata)this.metadata);
        List partitions = this.metadata.getAllPartitionPaths();
        fsView.loadAllPartitions();
        ArrayList<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<Pair<String, FileSlice>>();
        partitions.forEach(partition -> fsView.getLatestFileSlices(partition).forEach(fs -> partitionFileSlicePairs.add(Pair.of((Object)partition, (Object)fs))));
        return partitionFileSlicePairs;
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition() throws IOException {
        HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(this.dataMetaClient, (HoodieTimeline)this.dataMetaClient.getActiveTimeline(), (HoodieTableMetadata)this.metadata);
        HoodieTable hoodieTable = this.getHoodieTable(this.dataWriteConfig, this.dataMetaClient);
        List partitions = this.metadata.getAllPartitionPaths();
        fsView.loadAllPartitions();
        HoodieData records = null;
        if (this.dataMetaClient.getTableConfig().getTableType() == HoodieTableType.COPY_ON_WRITE) {
            ArrayList partitionBaseFilePairs = new ArrayList();
            for (String partition : partitions) {
                partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition).map(basefile -> Pair.of((Object)partition, (Object)basefile)).collect(Collectors.toList()));
            }
            LOG.info("Initializing record index from " + partitionBaseFilePairs.size() + " base files in " + partitions.size() + " partitions");
            records = HoodieTableMetadataUtil.readRecordKeysFromBaseFiles((HoodieEngineContext)this.engineContext, (HoodieConfig)this.dataWriteConfig, partitionBaseFilePairs, (boolean)false, (int)this.dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(), (StoragePath)this.dataMetaClient.getBasePath(), this.storageConf, (String)this.getClass().getSimpleName());
        } else {
            ArrayList<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<Pair<String, FileSlice>>();
            for (String partition : partitions) {
                fsView.getLatestFileSlices(partition).forEach(fs -> partitionFileSlicePairs.add(Pair.of((Object)partition, (Object)fs)));
            }
            LOG.info("Initializing record index from " + partitionFileSlicePairs.size() + " file slices in " + partitions.size() + " partitions");
            records = HoodieBackedTableMetadataWriter.readRecordKeysFromFileSliceSnapshot(this.engineContext, partitionFileSlicePairs, this.dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(), this.getClass().getSimpleName(), this.dataMetaClient, this.dataWriteConfig, hoodieTable);
        }
        records.persist("MEMORY_AND_DISK_SER");
        long recordCount = records.count();
        int fileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount((MetadataPartitionType)MetadataPartitionType.RECORD_INDEX, (long)recordCount, (int)48, (int)this.dataWriteConfig.getRecordIndexMinFileGroupCount(), (int)this.dataWriteConfig.getRecordIndexMaxFileGroupCount(), (float)this.dataWriteConfig.getRecordIndexGrowthFactor(), (int)this.dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes());
        LOG.info("Initializing record index with {} mappings and {} file groups.", (Object)recordCount, (Object)fileGroupCount);
        return Pair.of((Object)fileGroupCount, (Object)records);
    }

    private static HoodieData<HoodieRecord> readRecordKeysFromFileSliceSnapshot(HoodieEngineContext engineContext, List<Pair<String, FileSlice>> partitionFileSlicePairs, int recordIndexMaxParallelism, String activeModule, HoodieTableMetaClient metaClient, HoodieWriteConfig dataWriteConfig, HoodieTable hoodieTable) {
        if (partitionFileSlicePairs.isEmpty()) {
            return engineContext.emptyHoodieData();
        }
        Option instantTime = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp);
        engineContext.setJobStatus(activeModule, "Record Index: reading record keys from " + partitionFileSlicePairs.size() + " file slices");
        int parallelism = Math.min(partitionFileSlicePairs.size(), recordIndexMaxParallelism);
        return engineContext.parallelize(partitionFileSlicePairs, parallelism).flatMap((SerializableFunction & Serializable)partitionAndFileSlice -> {
            String partition = (String)partitionAndFileSlice.getKey();
            FileSlice fileSlice = (FileSlice)partitionAndFileSlice.getValue();
            String fileId = fileSlice.getFileId();
            return new HoodieMergedReadHandle(dataWriteConfig, (Option<String>)instantTime, hoodieTable, (Pair<String, String>)Pair.of((Object)partition, (Object)fileSlice.getFileId()), (Option<FileSlice>)Option.of((Object)fileSlice)).getMergedRecords().stream().map(record -> {
                HoodieRecord record1 = (HoodieRecord)record;
                return HoodieMetadataPayload.createRecordIndexUpdate((String)record1.getRecordKey(), (String)partition, (String)fileId, (String)record1.getCurrentLocation().getInstantTime(), (int)0);
            }).iterator();
        });
    }

    private Pair<Integer, HoodieData<HoodieRecord>> initializeFilesPartition(List<HoodieTableMetadataUtil.DirectoryInfo> partitionInfoList) {
        boolean fileGroupCount = true;
        List partitions = partitionInfoList.stream().map(p -> HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition((String)p.getRelativePath())).collect(Collectors.toList());
        int totalDataFilesCount = partitionInfoList.stream().mapToInt(HoodieTableMetadataUtil.DirectoryInfo::getTotalFiles).sum();
        LOG.info("Committing total {} partitions and {} files to metadata", (Object)partitions.size(), (Object)totalDataFilesCount);
        HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(partitions);
        HoodieData allPartitionsRecord = this.engineContext.parallelize(Collections.singletonList(record), 1);
        if (partitionInfoList.isEmpty()) {
            return Pair.of((Object)1, (Object)allPartitionsRecord);
        }
        this.engineContext.setJobStatus(this.getClass().getSimpleName(), "Creating records for metadata FILES partition");
        HoodieData fileListRecords = this.engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map((SerializableFunction & Serializable)partitionInfo -> {
            Map fileNameToSizeMap = partitionInfo.getFileNameToSizeMap();
            return HoodieMetadataPayload.createPartitionFilesRecord((String)partitionInfo.getRelativePath(), (Map)fileNameToSizeMap, Collections.emptyList());
        });
        ValidationUtils.checkState((fileListRecords.count() == (long)partitions.size() ? 1 : 0) != 0);
        return Pair.of((Object)1, (Object)allPartitionsRecord.union(fileListRecords));
    }

    private Set<String> getPendingDataInstants(HoodieTableMetaClient dataMetaClient) {
        return dataMetaClient.getActiveTimeline().getInstantsAsStream().filter(i -> !i.isCompleted()).filter(i -> !"indexing".equals(i.getAction())).map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
    }

    private HoodieTableMetaClient initializeMetaClient() throws IOException {
        HoodieTableMetaClient.withPropertyBuilder().setTableType(HoodieTableType.MERGE_ON_READ).setTableName(this.dataWriteConfig.getTableName() + "_metadata").setArchiveLogFolder((String)HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue()).setPayloadClassName(HoodieMetadataPayload.class.getName()).setBaseFileFormat(HoodieFileFormat.HFILE.toString()).setRecordKeyFields(RECORD_KEY_FIELD_NAME).setPopulateMetaFields(false).setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName()).initTable(this.storageConf.newInstance(), this.metadataWriteConfig.getBasePath());
        return HoodieTableMetaClient.builder().setBasePath(this.metadataWriteConfig.getBasePath()).setConf(this.storageConf.newInstance()).setTimeGeneratorConfig(this.dataWriteConfig.getTimeGeneratorConfig()).build();
    }

    private List<HoodieTableMetadataUtil.DirectoryInfo> listAllPartitionsFromFilesystem(String initializationTime, Set<String> pendingDataInstants) {
        if (this.dataMetaClient.getActiveTimeline().countInstants() == 0) {
            return Collections.emptyList();
        }
        ArrayDeque<StoragePath> pathsToList = new ArrayDeque<StoragePath>();
        pathsToList.add(new StoragePath(this.dataWriteConfig.getBasePath()));
        LinkedList<HoodieTableMetadataUtil.DirectoryInfo> partitionsToBootstrap = new LinkedList<HoodieTableMetadataUtil.DirectoryInfo>();
        int fileListingParallelism = this.metadataWriteConfig.getFileListingParallelism();
        StorageConfiguration storageConf = this.dataMetaClient.getStorageConf();
        String dirFilterRegex = this.dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
        StoragePath storageBasePath = this.dataMetaClient.getBasePath();
        while (!pathsToList.isEmpty()) {
            int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
            ArrayList pathsToProcess = new ArrayList(numDirsToList);
            for (int i = 0; i < numDirsToList; ++i) {
                pathsToProcess.add(pathsToList.poll());
            }
            this.engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing " + numDirsToList + " partitions from filesystem");
            List processedDirectories = this.engineContext.map(pathsToProcess, (SerializableFunction & Serializable)path -> {
                HoodieHadoopStorage storage = new HoodieHadoopStorage(path, storageConf);
                String relativeDirPath = FSUtils.getRelativePartitionPath((StoragePath)storageBasePath, (StoragePath)path);
                return new HoodieTableMetadataUtil.DirectoryInfo(relativeDirPath, storage.listDirectEntries(path), initializationTime, pendingDataInstants);
            }, numDirsToList);
            for (HoodieTableMetadataUtil.DirectoryInfo dirInfo : processedDirectories) {
                String relativePath;
                if (!dirFilterRegex.isEmpty() && !(relativePath = dirInfo.getRelativePath()).isEmpty() && relativePath.matches(dirFilterRegex)) {
                    LOG.info("Ignoring directory {} which matches the filter regex {}", (Object)relativePath, (Object)dirFilterRegex);
                    continue;
                }
                if (dirInfo.isHoodiePartition()) {
                    partitionsToBootstrap.add(dirInfo);
                    continue;
                }
                pathsToList.addAll(dirInfo.getSubDirectories());
            }
        }
        return partitionsToBootstrap;
    }

    private List<HoodieTableMetadataUtil.DirectoryInfo> listAllPartitionsFromMDT(String initializationTime, Set<String> pendingDataInstants) throws IOException {
        List allPartitionPaths = this.metadata.getAllPartitionPaths().stream().map(partitionPath -> this.dataWriteConfig.getBasePath() + '/' + partitionPath).collect(Collectors.toList());
        Map partitionFileMap = this.metadata.getAllFilesInPartitions(allPartitionPaths);
        ArrayList<HoodieTableMetadataUtil.DirectoryInfo> dirinfoList = new ArrayList<HoodieTableMetadataUtil.DirectoryInfo>(partitionFileMap.size());
        for (Map.Entry entry : partitionFileMap.entrySet()) {
            dirinfoList.add(new HoodieTableMetadataUtil.DirectoryInfo((String)entry.getKey(), (List)entry.getValue(), initializationTime, pendingDataInstants));
        }
        return dirinfoList;
    }

    private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, MetadataPartitionType metadataPartition, String instantTime, int fileGroupCount) throws IOException {
        String partitionName = HoodieIndexUtils.getPartitionNameFromPartitionType(metadataPartition, dataMetaClient, this.dataWriteConfig.getIndexingConfig().getIndexName());
        StoragePath partitionPath = new StoragePath(this.metadataWriteConfig.getBasePath(), partitionName);
        HoodieStorage storage = this.metadataMetaClient.getStorage();
        try {
            List existingFiles = storage.listDirectEntries(partitionPath);
            if (existingFiles.size() > 0) {
                LOG.warn("Deleting all existing files found in MDT partition {}", (Object)partitionName);
                storage.deleteDirectory(partitionPath);
                ValidationUtils.checkState((!storage.exists(partitionPath) ? 1 : 0) != 0, (String)("Failed to delete MDT partition " + partitionName));
            }
        }
        catch (FileNotFoundException existingFiles) {
            // empty catch block
        }
        String msg = String.format("Creating %d file groups for partition %s with base fileId %s at instant time %s", fileGroupCount, partitionName, metadataPartition.getFileIdPrefix(), instantTime);
        LOG.info(msg);
        List fileGroupFileIds = IntStream.range(0, fileGroupCount).mapToObj(i -> HoodieTableMetadataUtil.getFileIDForFileGroup((MetadataPartitionType)metadataPartition, (int)i)).collect(Collectors.toList());
        ValidationUtils.checkArgument((fileGroupFileIds.size() == fileGroupCount ? 1 : 0) != 0);
        this.engineContext.setJobStatus(this.getClass().getSimpleName(), msg);
        this.engineContext.foreach(fileGroupFileIds, (SerializableConsumer & Serializable)fileGroupFileId -> {
            try {
                Map<HoodieLogBlock.HeaderMetadataType, String> blockHeader = Collections.singletonMap(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
                HoodieDeleteBlock block = new HoodieDeleteBlock(Collections.emptyList(), false, blockHeader);
                try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(FSUtils.constructAbsolutePath((String)this.metadataWriteConfig.getBasePath(), (String)partitionName)).withFileId(fileGroupFileId).withDeltaCommit(instantTime).withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION.intValue()).withFileSize(0L).withSizeThreshold(this.metadataWriteConfig.getLogFileMaxSize()).withStorage(dataMetaClient.getStorage()).withRolloverLogWriteToken("0-0-0").withLogWriteToken("0-0-0").withFileExtension(".log").build();){
                    writer.appendBlock((HoodieLogBlock)block);
                }
            }
            catch (InterruptedException e) {
                throw new HoodieException(String.format("Failed to created fileGroup %s for partition %s", fileGroupFileId, partitionName), (Throwable)e);
            }
        }, fileGroupFileIds.size());
    }

    @Override
    public void dropMetadataPartitions(List<MetadataPartitionType> metadataPartitions) throws IOException {
        for (MetadataPartitionType partitionType : metadataPartitions) {
            String partitionPath = partitionType.getPartitionPath();
            this.dataMetaClient.getTableConfig().setMetadataPartitionState(this.dataMetaClient, partitionPath, false);
            LOG.warn("Deleting Metadata Table partition: {}", (Object)partitionPath);
            this.dataMetaClient.getStorage().deleteDirectory(new StoragePath(this.metadataWriteConfig.getBasePath(), partitionPath));
            LOG.warn("Deleting pending indexing instant from the timeline for partition: {}", (Object)partitionPath);
            HoodieBackedTableMetadataWriter.deletePendingIndexingInstant(this.dataMetaClient, partitionPath);
        }
        this.closeInternal();
    }

    private static void deletePendingIndexingInstant(HoodieTableMetaClient metaClient, String partitionPath) {
        metaClient.reloadActiveTimeline().filterPendingIndexTimeline().getInstantsAsStream().filter(instant -> HoodieInstant.State.REQUESTED.equals((Object)instant.getState())).forEach(instant -> {
            try {
                HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan((byte[])((byte[])metaClient.getActiveTimeline().readIndexPlanAsBytes(instant).get()));
                if (indexPlan.getIndexPartitionInfos().stream().anyMatch(indexPartitionInfo -> indexPartitionInfo.getMetadataPartitionPath().equals(partitionPath))) {
                    metaClient.getActiveTimeline().deleteInstantFileIfExists(instant);
                    metaClient.getActiveTimeline().deleteInstantFileIfExists(HoodieTimeline.getIndexInflightInstant((String)instant.getTimestamp()));
                }
            }
            catch (IOException e) {
                LOG.error("Failed to delete the instant file corresponding to {}", instant);
            }
        });
    }

    private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction) {
        Set<String> partitionsToUpdate = this.getMetadataPartitionsToUpdate();
        if (this.initialized && this.metadata != null) {
            Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata().entrySet().stream().filter(entry -> partitionsToUpdate.contains(((MetadataPartitionType)entry.getKey()).getPartitionPath())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            this.commit(instantTime, partitionRecordsMap);
        }
    }

    private Set<String> getMetadataPartitionsToUpdate() {
        Set partitionsToUpdate = this.dataMetaClient.getTableConfig().getMetadataPartitions();
        partitionsToUpdate.addAll(HoodieTableMetadataUtil.getInflightMetadataPartitions((HoodieTableConfig)this.dataMetaClient.getTableConfig()));
        if (!partitionsToUpdate.isEmpty()) {
            return partitionsToUpdate;
        }
        LOG.debug("There are no partitions to update according to table config. Falling back to enabled partition types in the write config.");
        return this.getEnabledPartitionTypes().stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
    }

    @Override
    public void buildMetadataPartitions(HoodieEngineContext engineContext, List<HoodieIndexPartitionInfo> indexPartitionInfos, String instantTime) throws IOException {
        if (indexPartitionInfos.isEmpty()) {
            LOG.warn("No partition to index in the plan");
            return;
        }
        String indexUptoInstantTime = indexPartitionInfos.get(0).getIndexUptoInstant();
        ArrayList partitionPaths = new ArrayList();
        ArrayList<MetadataPartitionType> partitionTypes = new ArrayList<MetadataPartitionType>();
        indexPartitionInfos.forEach(indexPartitionInfo -> {
            String relativePartitionPath = indexPartitionInfo.getMetadataPartitionPath();
            LOG.info("Creating a new metadata index for partition '{}' under path {} upto instant {}", new Object[]{relativePartitionPath, this.metadataWriteConfig.getBasePath(), indexUptoInstantTime});
            MetadataPartitionType partitionType = MetadataPartitionType.fromPartitionPath((String)relativePartitionPath);
            if (!this.enabledPartitionTypes.contains(partitionType)) {
                throw new HoodieIndexException(String.format("Indexing for metadata partition: %s is not enabled", partitionType));
            }
            partitionTypes.add(partitionType);
            partitionPaths.add(relativePartitionPath);
        });
        this.dataMetaClient.getTableConfig().setMetadataPartitionsInflight(this.dataMetaClient, partitionPaths);
        this.initializeFromFilesystem(instantTime, partitionTypes, (Option<String>)Option.empty());
    }

    @Override
    public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, HoodieData<WriteStatus> writeStatus, String instantTime) {
        this.processAndCommit(instantTime, () -> {
            Map partitionToRecordMap = HoodieTableMetadataUtil.convertMetadataToRecords((HoodieEngineContext)this.engineContext, (HoodieConfig)this.dataWriteConfig, (HoodieCommitMetadata)commitMetadata, (String)instantTime, (HoodieTableMetaClient)this.dataMetaClient, this.enabledPartitionTypes, (String)this.dataWriteConfig.getBloomFilterType(), (int)this.dataWriteConfig.getBloomIndexParallelism(), (boolean)this.dataWriteConfig.isMetadataColumnStatsIndexEnabled(), (int)this.dataWriteConfig.getColumnStatsIndexParallelism(), this.dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), (HoodieMetadataConfig)this.dataWriteConfig.getMetadataConfig());
            HoodieData<HoodieRecord> updatesFromWriteStatuses = this.getRecordIndexUpserts(writeStatus);
            HoodieData<HoodieRecord> additionalUpdates = this.getRecordIndexAdditionalUpserts(updatesFromWriteStatuses, commitMetadata);
            partitionToRecordMap.put(MetadataPartitionType.RECORD_INDEX, updatesFromWriteStatuses.union(additionalUpdates));
            this.updateFunctionalIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap);
            this.updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, writeStatus);
            return partitionToRecordMap;
        });
        this.closeInternal();
    }

    @Override
    public void update(HoodieCommitMetadata commitMetadata, HoodieData<HoodieRecord> records, String instantTime) {
        this.processAndCommit(instantTime, () -> {
            Map partitionToRecordMap = HoodieTableMetadataUtil.convertMetadataToRecords((HoodieEngineContext)this.engineContext, (HoodieConfig)this.dataWriteConfig, (HoodieCommitMetadata)commitMetadata, (String)instantTime, (HoodieTableMetaClient)this.dataMetaClient, this.enabledPartitionTypes, (String)this.dataWriteConfig.getBloomFilterType(), (int)this.dataWriteConfig.getBloomIndexParallelism(), (boolean)this.dataWriteConfig.isMetadataColumnStatsIndexEnabled(), (int)this.dataWriteConfig.getColumnStatsIndexParallelism(), this.dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), (HoodieMetadataConfig)this.dataWriteConfig.getMetadataConfig());
            HoodieData<HoodieRecord> additionalUpdates = this.getRecordIndexAdditionalUpserts(records, commitMetadata);
            partitionToRecordMap.put(MetadataPartitionType.RECORD_INDEX, records.union(additionalUpdates));
            this.updateFunctionalIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap);
            return partitionToRecordMap;
        });
        this.closeInternal();
    }

    private void updateFunctionalIndexIfPresent(HoodieCommitMetadata commitMetadata, String instantTime, Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordMap) {
        this.dataMetaClient.getTableConfig().getMetadataPartitions().stream().filter(partition -> partition.startsWith("func_index_")).forEach(partition -> {
            HoodieData<HoodieRecord> functionalIndexRecords;
            try {
                functionalIndexRecords = this.getFunctionalIndexUpdates(commitMetadata, (String)partition, instantTime);
            }
            catch (Exception e) {
                throw new HoodieMetadataException(String.format("Failed to get functional index updates for partition %s", partition), e);
            }
            partitionToRecordMap.put(MetadataPartitionType.FUNCTIONAL_INDEX, functionalIndexRecords);
        });
    }

    private HoodieData<HoodieRecord> getFunctionalIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, String instantTime) throws Exception {
        HoodieIndexDefinition indexDefinition = this.getFunctionalIndexDefinition(indexPartition);
        ArrayList<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<Pair<String, FileSlice>>();
        HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemView((HoodieTableMetaClient)this.dataMetaClient);
        commitMetadata.getPartitionToWriteStats().forEach((dataPartition, value) -> {
            List fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight((HoodieTableMetaClient)this.dataMetaClient, (Option)Option.ofNullable((Object)fsView), (String)dataPartition);
            fileSlices.forEach(fileSlice -> {
                List logFilesForInstant = fileSlice.getLogFiles().filter(logFile -> logFile.getDeltaCommitTime().equals(instantTime)).collect(Collectors.toList());
                partitionFileSlicePairs.add(Pair.of((Object)dataPartition, (Object)new FileSlice(fileSlice.getFileGroupId(), fileSlice.getBaseInstantTime(), (HoodieBaseFile)fileSlice.getBaseFile().orElse(null), logFilesForInstant)));
            });
        });
        int parallelism = Math.min(partitionFileSlicePairs.size(), this.dataWriteConfig.getMetadataConfig().getFunctionalIndexParallelism());
        Schema readerSchema = HoodieTableMetadataUtil.getProjectedSchemaForFunctionalIndex((HoodieIndexDefinition)indexDefinition, (HoodieTableMetaClient)this.dataMetaClient);
        return this.getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, this.dataMetaClient, parallelism, readerSchema, this.storageConf);
    }

    private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
        this.dataMetaClient.getTableConfig().getMetadataPartitions().stream().filter(partition -> partition.startsWith("secondary_index_")).forEach(partition -> {
            HoodieData<HoodieRecord> secondaryIndexRecords;
            try {
                secondaryIndexRecords = this.getSecondaryIndexUpdates(commitMetadata, (String)partition, writeStatus);
            }
            catch (Exception e) {
                throw new HoodieMetadataException("Failed to get secondary index updates for partition " + partition, e);
            }
            partitionToRecordMap.put(MetadataPartitionType.SECONDARY_INDEX, secondaryIndexRecords);
        });
    }

    private HoodieData<HoodieRecord> getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, HoodieData<WriteStatus> writeStatus) throws Exception {
        List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = HoodieBackedTableMetadataWriter.getPartitionFilePairs(commitMetadata);
        ArrayList keysToRemove = new ArrayList();
        writeStatus.collectAsList().forEach(status -> status.getWrittenRecordDelegates().forEach(recordDelegate -> {
            if (!recordDelegate.getNewLocation().isPresent() || recordDelegate.getCurrentLocation().isPresent() && recordDelegate.getNewLocation().isPresent()) {
                keysToRemove.add(recordDelegate.getRecordKey());
            }
        }));
        HoodieIndexDefinition indexDefinition = this.getFunctionalIndexDefinition(indexPartition);
        Map recordKeySecondaryKeyMap = this.metadata.getSecondaryKeys(keysToRemove);
        HoodieData<HoodieRecord> deletedRecords = this.getDeletedSecondaryRecordMapping(this.engineContext, recordKeySecondaryKeyMap, indexDefinition);
        int parallelism = Math.min(partitionFilePairs.size(), this.dataWriteConfig.getMetadataConfig().getSecondaryIndexParallelism());
        return deletedRecords.union(HoodieTableMetadataUtil.readSecondaryKeysFromBaseFiles((HoodieEngineContext)this.engineContext, partitionFilePairs, (int)parallelism, (String)this.getClass().getSimpleName(), (HoodieTableMetaClient)this.dataMetaClient, (EngineType)this.getEngineType(), (HoodieIndexDefinition)indexDefinition));
    }

    private static List<Pair<String, Pair<String, List<String>>>> getPartitionFilePairs(HoodieCommitMetadata commitMetadata) {
        ArrayList<Pair<String, Pair<String, List<String>>>> partitionFilePairs = new ArrayList<Pair<String, Pair<String, List<String>>>>();
        commitMetadata.getPartitionToWriteStats().forEach((dataPartition, writeStats) -> writeStats.forEach(writeStat -> {
            if (writeStat instanceof HoodieDeltaWriteStat) {
                partitionFilePairs.add(Pair.of((Object)dataPartition, (Object)Pair.of((Object)((HoodieDeltaWriteStat)writeStat).getBaseFile(), (Object)((HoodieDeltaWriteStat)writeStat).getLogFiles())));
            } else {
                partitionFilePairs.add(Pair.of((Object)dataPartition, (Object)Pair.of((Object)writeStat.getPath(), Collections.emptyList())));
            }
        }));
        return partitionFilePairs;
    }

    @Override
    public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
        this.processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords((HoodieEngineContext)this.engineContext, (HoodieCleanMetadata)cleanMetadata, (String)instantTime, (HoodieTableMetaClient)this.dataMetaClient, this.enabledPartitionTypes, (int)this.dataWriteConfig.getBloomIndexParallelism(), (boolean)this.dataWriteConfig.isMetadataColumnStatsIndexEnabled(), (int)this.dataWriteConfig.getColumnStatsIndexParallelism(), this.dataWriteConfig.getColumnsEnabledForColumnStatsIndex()));
        this.closeInternal();
    }

    @Override
    public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) {
        this.dataMetaClient.reloadActiveTimeline();
        HoodieInstant restoreInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, "restore", instantTime);
        HoodieInstant requested = HoodieTimeline.getRestoreRequestedInstant((HoodieInstant)restoreInstant);
        HoodieRestorePlan restorePlan = null;
        try {
            restorePlan = (HoodieRestorePlan)TimelineMetadataUtils.deserializeAvroMetadata((byte[])((byte[])this.dataMetaClient.getActiveTimeline().readRestoreInfoAsBytes(requested).get()), HoodieRestorePlan.class);
        }
        catch (IOException e) {
            throw new HoodieIOException(String.format("Deserialization of restore plan failed whose restore instant time is %s in data table", instantTime), e);
        }
        String restoreToInstantTime = restorePlan.getSavepointToRestoreTimestamp();
        LOG.info("Triggering restore to {} in metadata table", (Object)restoreToInstantTime);
        List filesGroups = this.metadata.getMetadataFileSystemView().getAllFileGroups(MetadataPartitionType.FILES.getPartitionPath()).collect(Collectors.toList());
        boolean cannotRestore = filesGroups.stream().map(fileGroup -> fileGroup.getAllFileSlices().map(FileSlice::getBaseInstantTime).anyMatch(instantTime1 -> HoodieTimeline.compareTimestamps((String)instantTime1, (BiPredicate)HoodieTimeline.LESSER_THAN_OR_EQUALS, (String)restoreToInstantTime))).anyMatch(canRestore -> canRestore == false);
        if (cannotRestore) {
            throw new HoodieMetadataException(String.format("Can't restore to %s since there is no base file in MDT lesser than the commit to restore to. Please delete metadata table and retry", restoreToInstantTime));
        }
        List<HoodieTableMetadataUtil.DirectoryInfo> dirInfoList = this.listAllPartitionsFromFilesystem(instantTime, Collections.emptySet());
        Map<String, HoodieTableMetadataUtil.DirectoryInfo> dirInfoMap = dirInfoList.stream().collect(Collectors.toMap(HoodieTableMetadataUtil.DirectoryInfo::getRelativePath, Function.identity()));
        dirInfoList.clear();
        BaseHoodieWriteClient<?, I, ?, ?> writeClient = this.getWriteClient();
        writeClient.restoreToInstant(restoreToInstantTime, false);
        try {
            this.initMetadataReader();
            HashMap<String, Map<String, Long>> partitionFilesToAdd = new HashMap<String, Map<String, Long>>();
            HashMap<String, List<String>> partitionFilesToDelete = new HashMap<String, List<String>>();
            ArrayList<String> partitionsToDelete = new ArrayList<String>();
            this.fetchOutofSyncFilesRecordsFromMetadataTable(dirInfoMap, partitionFilesToAdd, partitionFilesToDelete, partitionsToDelete);
            String syncCommitTime = writeClient.createNewInstantTime(false);
            this.processAndCommit(syncCommitTime, () -> HoodieTableMetadataUtil.convertMissingPartitionRecords((HoodieEngineContext)this.engineContext, (List)partitionsToDelete, (Map)partitionFilesToAdd, (Map)partitionFilesToDelete, (String)syncCommitTime));
            this.closeInternal();
        }
        catch (IOException e) {
            throw new HoodieMetadataException("IOException during MDT restore sync", (Exception)e);
        }
    }

    @Override
    public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) {
        if (this.initialized && this.metadata != null) {
            String commitToRollbackInstantTime = (String)rollbackMetadata.getCommitsRollback().get(0);
            HoodieInstant deltaCommitInstant = new HoodieInstant(false, "deltacommit", commitToRollbackInstantTime);
            if (this.metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().containsInstant(deltaCommitInstant)) {
                this.validateRollback(commitToRollbackInstantTime);
                LOG.info("Rolling back MDT deltacommit {}", (Object)commitToRollbackInstantTime);
                if (!this.getWriteClient().rollback(commitToRollbackInstantTime, instantTime)) {
                    throw new HoodieMetadataException(String.format("Failed to rollback deltacommit at %s", commitToRollbackInstantTime));
                }
            } else {
                LOG.info("Ignoring rollback of instant {} at {}. The commit to rollback is not found in MDT", (Object)commitToRollbackInstantTime, (Object)instantTime);
            }
            this.closeInternal();
        }
    }

    private void validateRollback(String commitToRollbackInstantTime) {
        Option deltaCommitsInfo = CompactionUtils.getDeltaCommitsSinceLatestCompaction((HoodieActiveTimeline)this.metadataMetaClient.getActiveTimeline());
        HoodieInstant compactionInstant = (HoodieInstant)((Pair)deltaCommitsInfo.get()).getValue();
        HoodieTimeline deltacommitsSinceCompaction = (HoodieTimeline)((Pair)deltaCommitsInfo.get()).getKey();
        if (compactionInstant.getAction().equals("commit")) {
            String compactionInstantTime = compactionInstant.getTimestamp();
            if (commitToRollbackInstantTime.length() == compactionInstantTime.length() && HoodieTimeline.LESSER_THAN_OR_EQUALS.test(commitToRollbackInstantTime, compactionInstantTime)) {
                throw new HoodieMetadataException(String.format("Commit being rolled back %s is earlier than the latest compaction %s. There are %d deltacommits after this compaction: %s", commitToRollbackInstantTime, compactionInstantTime, deltacommitsSinceCompaction.countInstants(), deltacommitsSinceCompaction.getInstants()));
            }
        }
    }

    @Override
    public void close() throws Exception {
        if (this.metadata != null) {
            this.metadata.close();
        }
        if (this.writeClient != null) {
            this.writeClient.close();
            this.writeClient = null;
        }
    }

    protected abstract void commit(String var1, Map<MetadataPartitionType, HoodieData<HoodieRecord>> var2);

    protected abstract I convertHoodieDataToEngineSpecificData(HoodieData<HoodieRecord> var1);

    protected void commitInternal(String instantTime, Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing, Option<BulkInsertPartitioner> bulkInsertPartitioner) {
        ValidationUtils.checkState((this.metadataMetaClient != null ? 1 : 0) != 0, (String)"Metadata table is not fully initialized yet.");
        HoodieData<HoodieRecord> preppedRecords = this.prepRecords(partitionRecordsMap);
        I preppedRecordInputs = this.convertHoodieDataToEngineSpecificData(preppedRecords);
        BaseHoodieWriteClient<?, I, ?, ?> writeClient = this.getWriteClient();
        if (this.dataWriteConfig.getFailedWritesCleanPolicy().isEager() && writeClient.rollbackFailedWrites()) {
            this.metadataMetaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)this.metadataMetaClient);
        }
        if (!this.metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime)) {
            LOG.info("New commit at {} being applied to MDT.", (Object)instantTime);
        } else {
            Option alreadyCompletedInstant = this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant();
            LOG.info("{} completed commit at {} being applied to MDT.", (Object)(alreadyCompletedInstant.isPresent() ? "Already" : "Partially"), (Object)instantTime);
            if (!writeClient.rollback(instantTime)) {
                throw new HoodieMetadataException(String.format("Failed to rollback deltacommit at %s from MDT", instantTime));
            }
            this.metadataMetaClient.reloadActiveTimeline();
        }
        writeClient.startCommitWithTime(instantTime);
        this.preWrite(instantTime);
        if (isInitializing) {
            this.engineContext.setJobStatus(this.getClass().getSimpleName(), String.format("Bulk inserting at %s into metadata table %s", instantTime, this.metadataWriteConfig.getTableName()));
            writeClient.bulkInsertPreppedRecords(preppedRecordInputs, instantTime, bulkInsertPartitioner);
        } else {
            this.engineContext.setJobStatus(this.getClass().getSimpleName(), String.format("Upserting at %s into metadata table %s", instantTime, this.metadataWriteConfig.getTableName()));
            writeClient.upsertPreppedRecords(preppedRecordInputs, instantTime);
        }
        this.metadataMetaClient.reloadActiveTimeline();
        this.metrics.ifPresent(m -> m.updateSizeMetrics(this.metadataMetaClient, this.metadata, this.dataMetaClient.getTableConfig().getMetadataPartitions()));
    }

    protected void preWrite(String instantTime) {
    }

    protected abstract void bulkCommit(String var1, MetadataPartitionType var2, HoodieData<HoodieRecord> var3, int var4);

    protected HoodieData<HoodieRecord> prepRecords(Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap) {
        HoodieData allPartitionRecords = this.engineContext.emptyHoodieData();
        HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemView((HoodieTableMetaClient)this.metadataMetaClient);
        for (Map.Entry<MetadataPartitionType, HoodieData<HoodieRecord>> entry : partitionRecordsMap.entrySet()) {
            int fileGroupCount;
            String partitionName = HoodieIndexUtils.getPartitionNameFromPartitionType(entry.getKey(), this.dataMetaClient, this.dataWriteConfig.getIndexingConfig().getIndexName());
            HoodieData<HoodieRecord> records = entry.getValue();
            List fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices((HoodieTableMetaClient)this.metadataMetaClient, (Option)Option.ofNullable((Object)fsView), (String)partitionName);
            if (fileSlices.isEmpty()) {
                fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight((HoodieTableMetaClient)this.metadataMetaClient, (Option)Option.ofNullable((Object)fsView), (String)partitionName);
            }
            ValidationUtils.checkArgument(((fileGroupCount = fileSlices.size()) > 0 ? 1 : 0) != 0, (String)String.format("FileGroup count for MDT partition %s should be >0", partitionName));
            List finalFileSlices = fileSlices;
            HoodieData rddSinglePartitionRecords = records.map((SerializableFunction & Serializable)r -> {
                FileSlice slice = (FileSlice)finalFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex((String)r.getRecordKey(), (int)fileGroupCount));
                r.unseal();
                r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
                r.seal();
                return r;
            });
            allPartitionRecords = allPartitionRecords.union(rddSinglePartitionRecords);
        }
        return allPartitionRecords;
    }

    @Override
    public void performTableServices(Option<String> inFlightInstantTimestamp) {
        HoodieTimer metadataTableServicesTimer = HoodieTimer.start();
        boolean allTableServicesExecutedSuccessfullyOrSkipped = true;
        BaseHoodieWriteClient<?, I, ?, ?> writeClient = this.getWriteClient();
        try {
            this.runPendingTableServicesOperations(writeClient);
            Option lastInstant = this.metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
            if (!lastInstant.isPresent()) {
                return;
            }
            this.cleanIfNecessary(writeClient);
            if (this.validateCompactionScheduling()) {
                String latestDeltacommitTime = ((HoodieInstant)lastInstant.get()).getTimestamp();
                LOG.info("Latest deltacommit time found is {}, running compaction operations.", (Object)latestDeltacommitTime);
                this.compactIfNecessary(writeClient);
            }
            writeClient.archive();
            LOG.info("All the table services operations on MDT completed successfully");
        }
        catch (Exception e) {
            LOG.error("Exception in running table services on metadata table", (Throwable)e);
            allTableServicesExecutedSuccessfullyOrSkipped = false;
            throw e;
        }
        finally {
            long timeSpent = metadataTableServicesTimer.endTimer();
            this.metrics.ifPresent(m -> m.setMetric("table_service_execution_duration", timeSpent));
            if (allTableServicesExecutedSuccessfullyOrSkipped) {
                this.metrics.ifPresent(m -> m.setMetric("table_service_execution_status", 1L));
            } else {
                this.metrics.ifPresent(m -> m.setMetric("table_service_execution_status", -1L));
            }
        }
    }

    private void runPendingTableServicesOperations(BaseHoodieWriteClient writeClient) {
        writeClient.runAnyPendingCompactions();
        writeClient.runAnyPendingLogCompactions();
    }

    protected void compactIfNecessary(BaseHoodieWriteClient writeClient) {
        HoodieTimeline metadataCompletedTimeline = this.metadataMetaClient.getActiveTimeline().filterCompletedInstants();
        String compactionInstantTime = (String)this.dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested().filter(instant -> metadataCompletedTimeline.containsInstant(instant.getTimestamp())).firstInstant().map(instant -> HoodieInstantTimeGenerator.instantTimeMinusMillis((String)instant.getTimestamp(), (long)1L)).orElse((Object)writeClient.createNewInstantTime(false));
        if (this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)) {
            LOG.info("Compaction with same {} time is already present in the timeline.", (Object)compactionInstantTime);
        } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, (Option<Map<String, String>>)Option.empty())) {
            LOG.info("Compaction is scheduled for timestamp {}", (Object)compactionInstantTime);
            writeClient.compact(compactionInstantTime);
        } else if (this.metadataWriteConfig.isLogCompactionEnabled()) {
            String logCompactionInstantTime = this.metadataMetaClient.createNewInstantTime(false);
            if (this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(logCompactionInstantTime)) {
                LOG.info("Log compaction with same {} time is already present in the timeline.", (Object)logCompactionInstantTime);
            } else if (writeClient.scheduleLogCompactionAtInstant(logCompactionInstantTime, (Option<Map<String, String>>)Option.empty())) {
                LOG.info("Log compaction is scheduled for timestamp {}", (Object)logCompactionInstantTime);
                writeClient.logCompact(logCompactionInstantTime);
            }
        }
    }

    protected void cleanIfNecessary(BaseHoodieWriteClient writeClient) {
        Option lastCompletedCompactionInstant = this.metadataMetaClient.reloadActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().lastInstant();
        if (lastCompletedCompactionInstant.isPresent() && this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().findInstantsAfter(((HoodieInstant)lastCompletedCompactionInstant.get()).getTimestamp()).countInstants() < 3) {
            return;
        }
        writeClient.clean(this.metadataMetaClient.createNewInstantTime(false));
        writeClient.lazyRollbackFailedIndexing();
    }

    protected boolean validateCompactionScheduling() {
        if (this.metadataWriteConfig.isLogCompactionEnabled()) {
            Option pendingLogCompactionInstant = this.metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
            Option pendingCompactionInstant = this.metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
            if (pendingLogCompactionInstant.isPresent() || pendingCompactionInstant.isPresent()) {
                LOG.warn("Not scheduling compaction or logCompaction, since a pending compaction instant {} or logCompaction {} instant is present", (Object)pendingCompactionInstant, (Object)pendingLogCompactionInstant);
                return false;
            }
        }
        return true;
    }

    private void fetchOutofSyncFilesRecordsFromMetadataTable(Map<String, HoodieTableMetadataUtil.DirectoryInfo> dirInfoMap, Map<String, Map<String, Long>> partitionFilesToAdd, Map<String, List<String>> partitionFilesToDelete, List<String> partitionsToDelete) throws IOException {
        for (String partition : this.metadata.fetchAllPartitionPaths()) {
            StoragePath partitionPath = null;
            partitionPath = StringUtils.isNullOrEmpty((String)partition) && !this.dataMetaClient.getTableConfig().isTablePartitioned() ? new StoragePath(this.dataWriteConfig.getBasePath()) : new StoragePath(this.dataWriteConfig.getBasePath(), partition);
            String partitionId = HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition((String)partition);
            List metadataFiles = this.metadata.getAllFilesInPartition(partitionPath);
            if (!dirInfoMap.containsKey(partition)) {
                partitionsToDelete.add(partitionId);
                if (metadataFiles == null || metadataFiles.size() <= 0) continue;
                partitionFilesToDelete.put(partitionId, metadataFiles.stream().map(f -> f.getPath().getName()).collect(Collectors.toList()));
                continue;
            }
            Map fsFiles = dirInfoMap.get(partition).getFileNameToSizeMap();
            List mdtFiles = metadataFiles.stream().map(mdtFile -> mdtFile.getPath().getName()).collect(Collectors.toList());
            List filesDeleted = metadataFiles.stream().map(f -> f.getPath().getName()).filter(n -> !fsFiles.containsKey(n)).collect(Collectors.toList());
            HashMap filesToAdd = new HashMap();
            dirInfoMap.get(partition).getFileNameToSizeMap().forEach((k, v) -> {
                if (!mdtFiles.contains(k)) {
                    filesToAdd.put(k, v);
                }
            });
            if (!filesToAdd.isEmpty()) {
                partitionFilesToAdd.put(partitionId, filesToAdd);
            }
            if (filesDeleted.isEmpty()) continue;
            partitionFilesToDelete.put(partitionId, filesDeleted);
        }
    }

    private HoodieData<HoodieRecord> getRecordIndexUpserts(HoodieData<WriteStatus> writeStatuses) {
        return writeStatuses.flatMap((SerializableFunction & Serializable)writeStatus -> {
            LinkedList<HoodieRecord> recordList = new LinkedList<HoodieRecord>();
            for (HoodieRecordDelegate recordDelegate : writeStatus.getWrittenRecordDelegates()) {
                HoodieRecord hoodieRecord;
                if (writeStatus.isErrored(recordDelegate.getHoodieKey()) || recordDelegate.getIgnoreIndexUpdate()) continue;
                Option newLocation = recordDelegate.getNewLocation();
                if (newLocation.isPresent()) {
                    if (recordDelegate.getCurrentLocation().isPresent()) {
                        if (((HoodieRecordLocation)recordDelegate.getCurrentLocation().get()).getFileId().equals(((HoodieRecordLocation)newLocation.get()).getFileId())) continue;
                        String msg = String.format("Detected update in location of record with key %s from %s to %s. The fileID should not change.", recordDelegate, recordDelegate.getCurrentLocation().get(), newLocation.get());
                        LOG.error(msg);
                        throw new HoodieMetadataException(msg);
                    }
                    hoodieRecord = HoodieMetadataPayload.createRecordIndexUpdate((String)recordDelegate.getRecordKey(), (String)recordDelegate.getPartitionPath(), (String)((HoodieRecordLocation)newLocation.get()).getFileId(), (String)((HoodieRecordLocation)newLocation.get()).getInstantTime(), (int)this.dataWriteConfig.getWritesFileIdEncoding());
                    recordList.add(hoodieRecord);
                    continue;
                }
                hoodieRecord = HoodieMetadataPayload.createRecordIndexDelete((String)recordDelegate.getRecordKey());
                recordList.add(hoodieRecord);
            }
            return recordList.iterator();
        });
    }

    private HoodieData<HoodieRecord> getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata replaceCommitMetadata) {
        HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(this.dataMetaClient, (HoodieTimeline)this.dataMetaClient.getActiveTimeline(), (HoodieTableMetadata)this.metadata);
        List partitionBaseFilePairs = replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream().flatMap(partition -> fsView.getLatestBaseFiles(partition).map(f -> Pair.of((Object)partition, (Object)f))).collect(Collectors.toList());
        return HoodieTableMetadataUtil.readRecordKeysFromBaseFiles((HoodieEngineContext)this.engineContext, (HoodieConfig)this.dataWriteConfig, partitionBaseFilePairs, (boolean)true, (int)this.dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(), (StoragePath)this.dataMetaClient.getBasePath(), this.storageConf, (String)this.getClass().getSimpleName());
    }

    private HoodieData<HoodieRecord> getRecordIndexAdditionalUpserts(HoodieData<HoodieRecord> updatesFromWriteStatuses, HoodieCommitMetadata commitMetadata) {
        WriteOperationType operationType = commitMetadata.getOperationType();
        if (operationType == WriteOperationType.INSERT_OVERWRITE) {
            return this.getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata)commitMetadata).mapToPair((SerializablePairFunction & Serializable)r -> Pair.of((Object)r.getKey(), (Object)r)).leftOuterJoin(updatesFromWriteStatuses.mapToPair((SerializablePairFunction & Serializable)r -> Pair.of((Object)r.getKey(), (Object)r))).values().filter((SerializableFunction & Serializable)p -> !((Option)p.getRight()).isPresent()).map(Pair::getLeft);
        }
        if (operationType == WriteOperationType.INSERT_OVERWRITE_TABLE) {
            return this.getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata)commitMetadata).mapToPair((SerializablePairFunction & Serializable)r -> Pair.of((Object)r.getRecordKey(), (Object)r)).leftOuterJoin(updatesFromWriteStatuses.mapToPair((SerializablePairFunction & Serializable)r -> Pair.of((Object)r.getRecordKey(), (Object)r))).values().filter((SerializableFunction & Serializable)p -> !((Option)p.getRight()).isPresent()).map(Pair::getLeft);
        }
        if (operationType == WriteOperationType.DELETE_PARTITION) {
            return this.getRecordIndexReplacedRecords((HoodieReplaceCommitMetadata)commitMetadata);
        }
        return this.engineContext.emptyHoodieData();
    }

    protected void closeInternal() {
        try {
            this.close();
        }
        catch (Exception e) {
            throw new HoodieException("Failed to close HoodieMetadata writer ", (Throwable)e);
        }
    }

    @Override
    public boolean isInitialized() {
        return this.initialized;
    }

    protected BaseHoodieWriteClient<?, I, ?, ?> getWriteClient() {
        if (this.writeClient == null) {
            this.writeClient = this.initializeWriteClient();
        }
        return this.writeClient;
    }

    protected abstract BaseHoodieWriteClient<?, I, ?, ?> initializeWriteClient();

    private static interface ConvertMetadataFunction {
        public Map<MetadataPartitionType, HoodieData<HoodieRecord>> convertMetadata();
    }
}

