/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.index;

import java.io.IOException;
import java.lang.invoke.LambdaMetafactory;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
    private static final Logger LOG = LogManager.getLogger(RunIndexActionExecutor.class);
    private static final Integer INDEX_COMMIT_METADATA_VERSION_1;
    private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION;
    private static final int MAX_CONCURRENT_INDEXING = 1;
    private static final int TIMELINE_RELOAD_INTERVAL_MILLIS = 5000;
    private volatile String currentCaughtupInstant;
    private final TransactionManager txnManager;

    public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
        super(context, config, table, instantTime);
        this.txnManager = new TransactionManager(config, (FileSystem)table.getMetaClient().getFs());
    }

    @Override
    public Option<HoodieIndexCommitMetadata> execute() {
        HoodieIndexPlan indexPlan;
        HoodieTimer indexTimer = new HoodieTimer();
        indexTimer.startTimer();
        HoodieInstant indexInstant = this.validateAndGetIndexInstant();
        try {
            indexPlan = TimelineMetadataUtils.deserializeIndexPlan((byte[])((byte[])this.table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get()));
        }
        catch (IOException e) {
            throw new HoodieIndexException("Failed to read the index plan for instant: " + indexInstant);
        }
        List indexPartitionInfos = indexPlan.getIndexPartitionInfos();
        try {
            if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
                throw new HoodieIndexException(String.format("No partitions to index for instant: %s", this.instantTime));
            }
            Set indexesInflightOrCompleted = HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions((HoodieTableConfig)this.table.getMetaClient().getTableConfig());
            Set requestedPartitions = indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet());
            requestedPartitions.retainAll(indexesInflightOrCompleted);
            if (!requestedPartitions.isEmpty()) {
                throw new HoodieIndexException(String.format("Following partitions already exist or inflight: %s", requestedPartitions));
            }
            this.table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant, Option.empty());
            HoodieTableMetadataWriter metadataWriter = (HoodieTableMetadataWriter)this.table.getMetadataWriter(this.instantTime).orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", this.instantTime)));
            String indexUptoInstant = ((HoodieIndexPartitionInfo)indexPartitionInfos.get(0)).getIndexUptoInstant();
            LOG.info((Object)("Starting Index Building with base instant: " + indexUptoInstant));
            metadataWriter.buildMetadataPartitions(this.context, indexPartitionInfos);
            List<HoodieInstant> instantsToCatchup = this.getInstantsToCatchup(indexUptoInstant);
            LOG.info((Object)("Total remaining instants to index: " + instantsToCatchup.size()));
            String metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath((String)this.table.getMetaClient().getBasePath());
            HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(metadataBasePath).build();
            Set<String> metadataCompletedTimestamps = RunIndexActionExecutor.getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant, metadataMetaClient).stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
            this.currentCaughtupInstant = indexUptoInstant;
            this.catchupWithInflightWriters(metadataWriter, instantsToCatchup, metadataMetaClient, metadataCompletedTimestamps);
            List<HoodieIndexPartitionInfo> finalIndexPartitionInfos = indexPartitionInfos.stream().map(info -> new HoodieIndexPartitionInfo(info.getVersion(), info.getMetadataPartitionPath(), this.currentCaughtupInstant)).collect(Collectors.toList());
            HoodieIndexCommitMetadata indexCommitMetadata = HoodieIndexCommitMetadata.newBuilder().setVersion(LATEST_INDEX_COMMIT_METADATA_VERSION).setIndexPartitionInfos(finalIndexPartitionInfos).build();
            this.updateTableConfigAndTimeline(indexInstant, finalIndexPartitionInfos, indexCommitMetadata);
            return Option.of((Object)indexCommitMetadata);
        }
        catch (IOException e) {
            this.abort(indexInstant, indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()));
            throw new HoodieIndexException(String.format("Unable to index instant: %s", indexInstant));
        }
    }

    private void abort(HoodieInstant indexInstant, Set<String> requestedPartitions) {
        Set inflightPartitions = HoodieTableMetadataUtil.getInflightMetadataPartitions((HoodieTableConfig)this.table.getMetaClient().getTableConfig());
        Set completedPartitions = HoodieTableMetadataUtil.getCompletedMetadataPartitions((HoodieTableConfig)this.table.getMetaClient().getTableConfig());
        requestedPartitions.forEach(partition -> {
            inflightPartitions.remove(partition);
            completedPartitions.remove(partition);
        });
        this.table.getMetaClient().getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT.key(), String.join((CharSequence)",", inflightPartitions));
        this.table.getMetaClient().getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join((CharSequence)",", completedPartitions));
        HoodieTableConfig.update((FileSystem)this.table.getMetaClient().getFs(), (Path)new Path(this.table.getMetaClient().getMetaPath()), (Properties)this.table.getMetaClient().getTableConfig().getProps());
        requestedPartitions.forEach(partition -> {
            MetadataPartitionType partitionType = MetadataPartitionType.valueOf((String)partition.toUpperCase(Locale.ROOT));
            if (HoodieTableMetadataUtil.metadataPartitionExists((String)this.table.getMetaClient().getBasePath(), (HoodieEngineContext)this.context, (MetadataPartitionType)partitionType)) {
                HoodieTableMetadataUtil.deleteMetadataPartition((String)this.table.getMetaClient().getBasePath(), (HoodieEngineContext)this.context, (MetadataPartitionType)partitionType);
            }
        });
        this.table.getMetaClient().reloadActiveTimeline().deleteInstantFileIfExists(HoodieTimeline.getIndexInflightInstant((String)indexInstant.getTimestamp()));
    }

    private List<HoodieInstant> getInstantsToCatchup(String indexUptoInstant) {
        Set validActions = CollectionUtils.createSet((Object[])new String[]{"clean", "restore", "rollback"});
        Option catchupStartInstant = this.table.getMetaClient().reloadActiveTimeline().getTimelineOfActions(validActions).filterInflightsAndRequested().findInstantsBefore(indexUptoInstant).firstInstant();
        List<HoodieInstant> instantsToIndex = catchupStartInstant.isPresent() ? RunIndexActionExecutor.getRemainingArchivedAndActiveInstantsSince(((HoodieInstant)catchupStartInstant.get()).getTimestamp(), this.table.getMetaClient()) : RunIndexActionExecutor.getRemainingArchivedAndActiveInstantsSince(indexUptoInstant, this.table.getMetaClient());
        return instantsToIndex;
    }

    private HoodieInstant validateAndGetIndexInstant() {
        if (!this.config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() || StringUtils.isNullOrEmpty((String)this.config.getLockProviderClass())) {
            throw new HoodieIndexException(String.format("Need to set %s as %s and configure lock provider class", HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
        }
        return (HoodieInstant)this.table.getActiveTimeline().filterPendingIndexTimeline().filter(instant -> instant.getTimestamp().equals(this.instantTime) && HoodieInstant.State.REQUESTED.equals((Object)instant.getState())).lastInstant().orElseThrow(() -> new HoodieIndexException(String.format("No requested index instant found: %s", this.instantTime)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateTableConfigAndTimeline(HoodieInstant indexInstant, List<HoodieIndexPartitionInfo> finalIndexPartitionInfos, HoodieIndexCommitMetadata indexCommitMetadata) throws IOException {
        try {
            this.txnManager.beginTransaction((Option<HoodieInstant>)Option.of((Object)indexInstant), (Option<HoodieInstant>)Option.empty());
            this.updateMetadataPartitionsTableConfig(this.table.getMetaClient(), finalIndexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()));
            this.table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, "indexing", indexInstant.getTimestamp()), TimelineMetadataUtils.serializeIndexCommitMetadata((HoodieIndexCommitMetadata)indexCommitMetadata));
        }
        finally {
            this.txnManager.endTransaction((Option<HoodieInstant>)Option.of((Object)indexInstant));
        }
    }

    private void catchupWithInflightWriters(HoodieTableMetadataWriter metadataWriter, List<HoodieInstant> instantsToIndex, HoodieTableMetaClient metadataMetaClient, Set<String> metadataCompletedTimestamps) {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        Future<?> indexingCatchupTaskFuture = executorService.submit(new IndexingCatchupTask(metadataWriter, instantsToIndex, metadataCompletedTimestamps, this.table.getMetaClient(), metadataMetaClient));
        try {
            LOG.info((Object)"Starting index catchup task");
            indexingCatchupTaskFuture.get(this.config.getIndexingCheckTimeoutSeconds(), TimeUnit.SECONDS);
        }
        catch (Exception e) {
            indexingCatchupTaskFuture.cancel(true);
            throw new HoodieIndexException(String.format("Index catchup failed. Current indexed instant = %s. Aborting!", this.currentCaughtupInstant), (Throwable)e);
        }
        finally {
            executorService.shutdownNow();
        }
    }

    private static List<HoodieInstant> getRemainingArchivedAndActiveInstantsSince(String instant, HoodieTableMetaClient metaClient) {
        List<HoodieInstant> remainingInstantsToIndex = metaClient.getArchivedTimeline().getInstants().filter(i -> HoodieTimeline.compareTimestamps((String)i.getTimestamp(), (BiPredicate)HoodieTimeline.GREATER_THAN_OR_EQUALS, (String)instant)).filter(i -> !"indexing".equals(i.getAction())).collect(Collectors.toList());
        remainingInstantsToIndex.addAll(metaClient.getActiveTimeline().findInstantsAfter(instant).getInstants().filter(i -> HoodieTimeline.compareTimestamps((String)i.getTimestamp(), (BiPredicate)HoodieTimeline.GREATER_THAN_OR_EQUALS, (String)instant)).filter(i -> !"indexing".equals(i.getAction())).collect(Collectors.toList()));
        return remainingInstantsToIndex;
    }

    private static List<HoodieInstant> getCompletedArchivedAndActiveInstantsAfter(String instant, HoodieTableMetaClient metaClient) {
        List<HoodieInstant> completedInstants = metaClient.getArchivedTimeline().filterCompletedInstants().findInstantsAfter(instant).getInstants().filter(i -> !"indexing".equals(i.getAction())).collect(Collectors.toList());
        completedInstants.addAll(metaClient.reloadActiveTimeline().filterCompletedInstants().findInstantsAfter(instant).getInstants().filter(i -> !"indexing".equals(i.getAction())).collect(Collectors.toList()));
        return completedInstants;
    }

    private void updateMetadataPartitionsTableConfig(HoodieTableMetaClient metaClient, Set<String> metadataPartitions) {
        Set inflightPartitions = HoodieTableMetadataUtil.getInflightMetadataPartitions((HoodieTableConfig)metaClient.getTableConfig());
        Set completedPartitions = HoodieTableMetadataUtil.getCompletedMetadataPartitions((HoodieTableConfig)metaClient.getTableConfig());
        inflightPartitions.removeAll(metadataPartitions);
        completedPartitions.addAll(metadataPartitions);
        metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT.key(), String.join((CharSequence)",", inflightPartitions));
        metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join((CharSequence)",", completedPartitions));
        HoodieTableConfig.update((FileSystem)metaClient.getFs(), (Path)new Path(metaClient.getMetaPath()), (Properties)metaClient.getTableConfig().getProps());
    }

    static /* synthetic */ String access$002(RunIndexActionExecutor x0, String x1) {
        x0.currentCaughtupInstant = x1;
        return x0.currentCaughtupInstant;
    }

    static /* synthetic */ Logger access$100() {
        return LOG;
    }

    static /* synthetic */ TransactionManager access$200(RunIndexActionExecutor x0) {
        return x0.txnManager;
    }

    static /* synthetic */ HoodieTable access$300(RunIndexActionExecutor x0) {
        return x0.table;
    }

    static /* synthetic */ HoodieTable access$400(RunIndexActionExecutor x0) {
        return x0.table;
    }

    static /* synthetic */ HoodieTable access$500(RunIndexActionExecutor x0) {
        return x0.table;
    }

    static /* synthetic */ HoodieTable access$600(RunIndexActionExecutor x0) {
        return x0.table;
    }

    static {
        LATEST_INDEX_COMMIT_METADATA_VERSION = INDEX_COMMIT_METADATA_VERSION_1 = Integer.valueOf(1);
    }

    class IndexingCatchupTask
    implements Runnable {
        private final HoodieTableMetadataWriter metadataWriter;
        private final List<HoodieInstant> instantsToIndex;
        private final Set<String> metadataCompletedInstants;
        private final HoodieTableMetaClient metaClient;
        private final HoodieTableMetaClient metadataMetaClient;

        IndexingCatchupTask(HoodieTableMetadataWriter metadataWriter, List<HoodieInstant> instantsToIndex, Set<String> metadataCompletedInstants, HoodieTableMetaClient metaClient, HoodieTableMetaClient metadataMetaClient) {
            this.metadataWriter = metadataWriter;
            this.instantsToIndex = instantsToIndex;
            this.metadataCompletedInstants = metadataCompletedInstants;
            this.metaClient = metaClient;
            this.metadataMetaClient = metadataMetaClient;
        }

        /*
         * Unable to fully structure code
         */
        @Override
        public void run() {
lbl1:
            // 9 sources

            for (HoodieInstant instant : this.instantsToIndex) {
                if (!this.metadataCompletedInstants.isEmpty() && this.metadataCompletedInstants.contains(instant.getTimestamp())) {
                    RunIndexActionExecutor.access$002(RunIndexActionExecutor.this, instant.getTimestamp());
                    continue;
                }
                while (!instant.isCompleted()) {
                    try {
                        RunIndexActionExecutor.access$100().warn((Object)("instant not completed, reloading timeline " + instant));
                        instantTime = instant.getTimestamp();
                        currentInstant = this.metaClient.reloadActiveTimeline().filterCompletedInstants().filter((Predicate<HoodieInstant>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Z, lambda$run$0(java.lang.String org.apache.hudi.common.table.timeline.HoodieInstant ), (Lorg/apache/hudi/common/table/timeline/HoodieInstant;)Z)((String)instantTime)).firstInstant();
                        instant = (HoodieInstant)currentInstant.orElse((Object)instant);
                        Thread.sleep(5000L);
                    }
                    catch (InterruptedException e) {
                        throw new HoodieIndexException(String.format("Thread interrupted while running indexing check for instant: %s", new Object[]{instant}), (Throwable)e);
                    }
                }
                if (!HoodieInstant.State.COMPLETED.equals((Object)instant.getState())) continue;
                instantTime = instant.getTimestamp();
                metadataInstant = this.metadataMetaClient.reloadActiveTimeline().filterCompletedInstants().filter((Predicate<HoodieInstant>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Z, lambda$run$1(java.lang.String org.apache.hudi.common.table.timeline.HoodieInstant ), (Lorg/apache/hudi/common/table/timeline/HoodieInstant;)Z)((String)instantTime)).firstInstant();
                if (metadataInstant.isPresent()) {
                    RunIndexActionExecutor.access$002(RunIndexActionExecutor.this, instantTime);
                    continue;
                }
                try {
                    RunIndexActionExecutor.access$200(RunIndexActionExecutor.this).beginTransaction((Option<HoodieInstant>)Option.of((Object)instant), (Option<HoodieInstant>)Option.empty());
                    RunIndexActionExecutor.access$100().info((Object)("Updating metadata table for instant: " + instant));
                    var5_6 = instant.getAction();
                    var6_8 = -1;
                    switch (var5_6.hashCode()) {
                        case -1354815177: {
                            if (!var5_6.equals("commit")) break;
                            var6_8 = 0;
                            break;
                        }
                        case -474858769: {
                            if (!var5_6.equals("deltacommit")) break;
                            var6_8 = 1;
                            break;
                        }
                        case 1519387883: {
                            if (!var5_6.equals("replacecommit")) break;
                            var6_8 = 2;
                            break;
                        }
                        case 94746185: {
                            if (!var5_6.equals("clean")) break;
                            var6_8 = 3;
                            break;
                        }
                        case 1097519758: {
                            if (!var5_6.equals("restore")) break;
                            var6_8 = 4;
                            break;
                        }
                        case -259719452: {
                            if (!var5_6.equals("rollback")) break;
                            var6_8 = 5;
                        }
                    }
                    switch (var6_8) {
                        case 0: 
                        case 1: 
                        case 2: {
                            commitMetadata = (HoodieCommitMetadata)HoodieCommitMetadata.fromBytes((byte[])((byte[])RunIndexActionExecutor.access$300(RunIndexActionExecutor.this).getActiveTimeline().getInstantDetails(instant).get()), HoodieCommitMetadata.class);
                            this.metadataWriter.update(commitMetadata, instant.getTimestamp(), false);
                            ** break;
                        }
                        case 3: {
                            cleanMetadata = CleanerUtils.getCleanerMetadata((HoodieTableMetaClient)RunIndexActionExecutor.access$400(RunIndexActionExecutor.this).getMetaClient(), (HoodieInstant)instant);
                            this.metadataWriter.update(cleanMetadata, instant.getTimestamp());
                            ** break;
                        }
                        case 4: {
                            restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata((byte[])((byte[])RunIndexActionExecutor.access$500(RunIndexActionExecutor.this).getActiveTimeline().getInstantDetails(instant).get()));
                            this.metadataWriter.update(restoreMetadata, instant.getTimestamp());
                            ** break;
                        }
                        case 5: {
                            rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata((byte[])((byte[])RunIndexActionExecutor.access$600(RunIndexActionExecutor.this).getActiveTimeline().getInstantDetails(instant).get()));
                            this.metadataWriter.update(rollbackMetadata, instant.getTimestamp());
                            ** break;
                        }
                        default: {
                            throw new IllegalStateException("Unexpected value: " + instant.getAction());
                        }
                    }
                }
                catch (IOException e) {
                    throw new HoodieIndexException(String.format("Could not update metadata partition for instant: %s", new Object[]{instant}), (Throwable)e);
                }
                finally {
                    RunIndexActionExecutor.access$200(RunIndexActionExecutor.this).endTransaction((Option<HoodieInstant>)Option.of((Object)instant));
                }
            }
        }

        private static /* synthetic */ boolean lambda$run$1(String instantTime, HoodieInstant i) {
            return i.getTimestamp().equals(instantTime);
        }

        private static /* synthetic */ boolean lambda$run$0(String instantTime, HoodieInstant i) {
            return i.getTimestamp().equals(instantTime);
        }
    }
}

