/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.metadata;

import com.carrotsearch.hppc.cursors.IntObjectCursor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.flink.elasticsearch7.shaded.org.apache.logging.log4j.LogManager;
import org.apache.flink.elasticsearch7.shaded.org.apache.logging.log4j.Logger;
import org.apache.flink.elasticsearch7.shaded.org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchException;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.Version;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.ActionListener;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.NotifyOnceListener;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.support.ActiveShardsObserver;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.support.replication.ReplicationResponse;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.ClusterState;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.ack.AckedRequest;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.ack.OpenIndexClusterStateUpdateResponse;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.block.ClusterBlock;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.block.ClusterBlocks;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.metadata.IndexMetaData;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.metadata.MetaData;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.routing.RoutingTable;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.service.ClusterService;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.Priority;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.Strings;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.UUIDs;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.ValidationException;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.collect.Tuple;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.inject.Inject;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.settings.Setting;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.settings.Settings;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.unit.TimeValue;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.AtomicArray;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.CountDown;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.index.Index;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.index.IndexNotFoundException;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.index.shard.ShardId;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.indices.IndicesService;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.rest.RestStatus;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.snapshots.RestoreService;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.snapshots.SnapshotInProgressException;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.snapshots.SnapshotsService;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.tasks.TaskId;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.threadpool.ThreadPool;

public class MetaDataIndexStateService {
    private static final Logger logger = LogManager.getLogger(MetaDataIndexStateService.class);
    public static final int INDEX_CLOSED_BLOCK_ID = 4;
    public static final ClusterBlock INDEX_CLOSED_BLOCK = new ClusterBlock(4, "index closed", false, false, false, RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE);
    public static final Setting<Boolean> VERIFIED_BEFORE_CLOSE_SETTING = Setting.boolSetting("index.verified_before_close", false, Setting.Property.IndexScope, Setting.Property.PrivateIndex);
    private final ClusterService clusterService;
    private final AllocationService allocationService;
    private final MetaDataIndexUpgradeService metaDataIndexUpgradeService;
    private final IndicesService indicesService;
    private final ThreadPool threadPool;
    private final TransportVerifyShardBeforeCloseAction transportVerifyShardBeforeCloseAction;
    private final ActiveShardsObserver activeShardsObserver;

    @Inject
    public MetaDataIndexStateService(ClusterService clusterService, AllocationService allocationService, MetaDataIndexUpgradeService metaDataIndexUpgradeService, IndicesService indicesService, ThreadPool threadPool, TransportVerifyShardBeforeCloseAction transportVerifyShardBeforeCloseAction) {
        this.indicesService = indicesService;
        this.clusterService = clusterService;
        this.allocationService = allocationService;
        this.threadPool = threadPool;
        this.transportVerifyShardBeforeCloseAction = transportVerifyShardBeforeCloseAction;
        this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
        this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
    }

    public void closeIndices(CloseIndexClusterStateUpdateRequest request, ActionListener<CloseIndexResponse> listener) {
        Object[] concreteIndices = request.indices();
        if (concreteIndices == null || concreteIndices.length == 0) {
            throw new IllegalArgumentException("Index name is required");
        }
        this.clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices), new ClusterStateUpdateTask(Priority.URGENT, (Index[])concreteIndices, listener, request){
            private final Map<Index, ClusterBlock> blockedIndices;
            final /* synthetic */ Index[] val$concreteIndices;
            final /* synthetic */ ActionListener val$listener;
            final /* synthetic */ CloseIndexClusterStateUpdateRequest val$request;
            {
                this.val$concreteIndices = indexArray;
                this.val$listener = actionListener;
                this.val$request = closeIndexClusterStateUpdateRequest;
                super(priority);
                this.blockedIndices = new HashMap<Index, ClusterBlock>();
            }

            @Override
            public ClusterState execute(ClusterState currentState) {
                return MetaDataIndexStateService.addIndexClosedBlocks(this.val$concreteIndices, this.blockedIndices, currentState);
            }

            @Override
            public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                if (oldState == newState) {
                    assert (this.blockedIndices.isEmpty()) : "List of blocked indices is not empty but cluster state wasn't changed";
                    this.val$listener.onResponse(new CloseIndexResponse(true, false, Collections.emptyList()));
                } else {
                    assert (!this.blockedIndices.isEmpty()) : "List of blocked indices is empty but cluster state was changed";
                    MetaDataIndexStateService.this.threadPool.executor("management").execute(new WaitForClosedBlocksApplied(this.blockedIndices, this.val$request, ActionListener.wrap(verifyResults -> MetaDataIndexStateService.this.clusterService.submitStateUpdateTask("close-indices", new ClusterStateUpdateTask(Priority.URGENT, (Map)verifyResults, this.val$listener, this.val$request){
                        private final List indices;
                        final /* synthetic */ Map val$verifyResults;
                        final /* synthetic */ ActionListener val$listener;
                        final /* synthetic */ CloseIndexClusterStateUpdateRequest val$request;
                        {
                            this.val$verifyResults = map;
                            this.val$listener = actionListener;
                            this.val$request = closeIndexClusterStateUpdateRequest;
                            super(priority);
                            this.indices = new ArrayList();
                        }

                        @Override
                        public ClusterState execute(ClusterState currentState) throws Exception {
                            Tuple<ClusterState, Collection<CloseIndexResponse.IndexResult>> closingResult = MetaDataIndexStateService.closeRoutingTable(currentState, blockedIndices, this.val$verifyResults);
                            assert (this.val$verifyResults.size() == closingResult.v2().size());
                            this.indices.addAll(closingResult.v2());
                            return MetaDataIndexStateService.this.allocationService.reroute(closingResult.v1(), "indices closed");
                        }

                        @Override
                        public void onFailure(String source, Exception e) {
                            this.val$listener.onFailure(e);
                        }

                        @Override
                        public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                            boolean acknowledged = this.indices.stream().noneMatch(CloseIndexResponse.IndexResult::hasFailures);
                            String[] waitForIndices = (String[])this.indices.stream().filter(result -> !result.hasFailures()).filter(result -> newState.routingTable().hasIndex(result.getIndex())).map(result -> result.getIndex().getName()).toArray(String[]::new);
                            if (waitForIndices.length > 0) {
                                MetaDataIndexStateService.this.activeShardsObserver.waitForActiveShards(waitForIndices, this.val$request.waitForActiveShards(), this.val$request.ackTimeout(), shardsAcknowledged -> {
                                    if (!shardsAcknowledged.booleanValue()) {
                                        logger.debug("[{}] indices closed, but the operation timed out while waiting for enough shards to be started.", (Object)Arrays.toString(waitForIndices));
                                    }
                                    boolean shardsAcked = acknowledged ? shardsAcknowledged : false;
                                    this.val$listener.onResponse(new CloseIndexResponse(acknowledged, shardsAcked, this.indices));
                                }, this.val$listener::onFailure);
                            } else {
                                this.val$listener.onResponse(new CloseIndexResponse(acknowledged, false, this.indices));
                            }
                        }
                    }), this.val$listener::onFailure)));
                }
            }

            @Override
            public void onFailure(String source, Exception e) {
                this.val$listener.onFailure(e);
            }

            @Override
            public TimeValue timeout() {
                return this.val$request.masterNodeTimeout();
            }
        });
    }

    static ClusterState addIndexClosedBlocks(Index[] indices, Map<Index, ClusterBlock> blockedIndices, ClusterState currentState) {
        MetaData.Builder metadata = MetaData.builder(currentState.metaData());
        HashSet<Index> indicesToClose = new HashSet<Index>();
        for (Index index : indices) {
            IndexMetaData indexMetaData = metadata.getSafe(index);
            if (indexMetaData.getState() != IndexMetaData.State.CLOSE) {
                indicesToClose.add(index);
                continue;
            }
            logger.debug("index {} is already closed, ignoring", (Object)index);
            assert (currentState.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK));
        }
        if (indicesToClose.isEmpty()) {
            return currentState;
        }
        Set<Index> restoringIndices = RestoreService.restoringIndices(currentState, indicesToClose);
        if (!restoringIndices.isEmpty()) {
            throw new IllegalArgumentException("Cannot close indices that are being restored: " + restoringIndices);
        }
        Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(currentState, indicesToClose);
        if (!snapshottingIndices.isEmpty()) {
            throw new SnapshotInProgressException("Cannot close indices that are being snapshotted: " + snapshottingIndices + ". Try again after snapshot finishes or cancel the currently running snapshot.");
        }
        ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
        RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
        for (Index index : indicesToClose) {
            ClusterBlock indexBlock = null;
            Set<ClusterBlock> clusterBlocks = currentState.blocks().indices().get(index.getName());
            if (clusterBlocks != null) {
                for (ClusterBlock clusterBlock : clusterBlocks) {
                    if (clusterBlock.id() != 4) continue;
                    indexBlock = clusterBlock;
                    break;
                }
            }
            if (indexBlock == null) {
                indexBlock = MetaDataIndexStateService.createIndexClosingBlock();
            }
            assert (Strings.hasLength(indexBlock.uuid())) : "Closing block should have a UUID";
            blocks.addIndexBlock(index.getName(), indexBlock);
            blockedIndices.put(index, indexBlock);
        }
        logger.info(() -> new ParameterizedMessage("closing indices {}", (Object)blockedIndices.keySet().stream().map(Object::toString).collect(Collectors.joining(","))));
        return ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build();
    }

    static Tuple<ClusterState, Collection<CloseIndexResponse.IndexResult>> closeRoutingTable(ClusterState currentState, Map<Index, ClusterBlock> blockedIndices, Map<Index, CloseIndexResponse.IndexResult> verifyResult) {
        boolean removeRoutingTable = currentState.nodes().getMinNodeVersion().before(Version.V_7_2_0);
        MetaData.Builder metadata = MetaData.builder(currentState.metaData());
        ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
        RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
        HashSet<String> closedIndices = new HashSet<String>();
        HashMap<Index, CloseIndexResponse.IndexResult> closingResults = new HashMap<Index, CloseIndexResponse.IndexResult>(verifyResult);
        for (Map.Entry<Index, CloseIndexResponse.IndexResult> result : verifyResult.entrySet()) {
            Index index = result.getKey();
            boolean acknowledged = !result.getValue().hasFailures();
            try {
                if (!acknowledged) {
                    logger.debug("verification of shards before closing {} failed", (Object)index);
                    continue;
                }
                IndexMetaData indexMetaData = metadata.getSafe(index);
                if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
                    logger.debug("verification of shards before closing {} succeeded but index is already closed", (Object)index);
                    assert (currentState.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK));
                    continue;
                }
                ClusterBlock closingBlock = blockedIndices.get(index);
                assert (closingBlock != null);
                if (!currentState.blocks().hasIndexBlock(index.getName(), closingBlock)) {
                    closingResults.put(result.getKey(), new CloseIndexResponse.IndexResult(result.getKey(), new IllegalStateException("verification of shards before closing " + index + " succeeded but block has been removed in the meantime")));
                    logger.debug("verification of shards before closing {} succeeded but block has been removed in the meantime", (Object)index);
                    continue;
                }
                Set<Index> restoringIndices = RestoreService.restoringIndices(currentState, Collections.singleton(index));
                if (!restoringIndices.isEmpty()) {
                    closingResults.put(result.getKey(), new CloseIndexResponse.IndexResult(result.getKey(), new IllegalStateException("verification of shards before closing " + index + " succeeded but index is being restored in the meantime")));
                    logger.debug("verification of shards before closing {} succeeded but index is being restored in the meantime", (Object)index);
                    continue;
                }
                Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(currentState, Collections.singleton(index));
                if (!snapshottingIndices.isEmpty()) {
                    closingResults.put(result.getKey(), new CloseIndexResponse.IndexResult(result.getKey(), new IllegalStateException("verification of shards before closing " + index + " succeeded but index is being snapshot in the meantime")));
                    logger.debug("verification of shards before closing {} succeeded but index is being snapshot in the meantime", (Object)index);
                    continue;
                }
                blocks.removeIndexBlockWithId(index.getName(), 4);
                blocks.addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
                IndexMetaData.Builder updatedMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE);
                if (removeRoutingTable) {
                    metadata.put(updatedMetaData);
                    routingTable.remove(index.getName());
                } else {
                    metadata.put(updatedMetaData.settingsVersion(indexMetaData.getSettingsVersion() + 1L).settings(Settings.builder().put(indexMetaData.getSettings()).put(VERIFIED_BEFORE_CLOSE_SETTING.getKey(), true)));
                    routingTable.addAsFromOpenToClose(metadata.getSafe(index));
                }
                logger.debug("closing index {} succeeded", (Object)index);
                closedIndices.add(index.getName());
            }
            catch (IndexNotFoundException e) {
                logger.debug("index {} has been deleted since it was blocked before closing, ignoring", (Object)index);
            }
        }
        logger.info("completed closing of indices {}", (Object)closedIndices);
        return Tuple.tuple(ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build(), closingResults.values());
    }

    public void openIndex(OpenIndexClusterStateUpdateRequest request, ActionListener<OpenIndexClusterStateUpdateResponse> listener) {
        this.onlyOpenIndex(request, ActionListener.wrap(response -> {
            if (response.isAcknowledged()) {
                String[] indexNames = (String[])Arrays.stream(request.indices()).map(Index::getName).toArray(String[]::new);
                this.activeShardsObserver.waitForActiveShards(indexNames, request.waitForActiveShards(), request.ackTimeout(), shardsAcknowledged -> {
                    if (!shardsAcknowledged.booleanValue()) {
                        logger.debug("[{}] indices opened, but the operation timed out while waiting for enough shards to be started.", (Object)Arrays.toString(indexNames));
                    }
                    listener.onResponse(new OpenIndexClusterStateUpdateResponse(response.isAcknowledged(), (boolean)shardsAcknowledged));
                }, listener::onFailure);
            } else {
                listener.onResponse(new OpenIndexClusterStateUpdateResponse(false, false));
            }
        }, listener::onFailure));
    }

    private void onlyOpenIndex(final OpenIndexClusterStateUpdateRequest request, ActionListener<ClusterStateUpdateResponse> listener) {
        if (request.indices() == null || request.indices().length == 0) {
            throw new IllegalArgumentException("Index name is required");
        }
        final String indicesAsString = Arrays.toString(request.indices());
        this.clusterService.submitStateUpdateTask("open-indices " + indicesAsString, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, (AckedRequest)request, listener){

            @Override
            protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
                return new ClusterStateUpdateResponse(acknowledged);
            }

            @Override
            public ClusterState execute(ClusterState currentState) {
                ClusterState updatedState = MetaDataIndexStateService.this.openIndices(request.indices(), currentState);
                return MetaDataIndexStateService.this.allocationService.reroute(updatedState, "indices opened [" + indicesAsString + "]");
            }
        });
    }

    ClusterState openIndices(Index[] indices, ClusterState currentState) {
        ArrayList<IndexMetaData> indicesToOpen = new ArrayList<IndexMetaData>();
        for (Index index : indices) {
            IndexMetaData indexMetaData = currentState.metaData().getIndexSafe(index);
            if (indexMetaData.getState() != IndexMetaData.State.OPEN) {
                indicesToOpen.add(indexMetaData);
                continue;
            }
            if (!currentState.blocks().hasIndexBlockWithId(index.getName(), 4)) continue;
            indicesToOpen.add(indexMetaData);
        }
        MetaDataIndexStateService.validateShardLimit(currentState, indices);
        if (indicesToOpen.isEmpty()) {
            return currentState;
        }
        logger.info(() -> new ParameterizedMessage("opening indices [{}]", (Object)String.join((CharSequence)",", indicesToOpen.stream().map(i -> i.getIndex().toString())::iterator)));
        MetaData.Builder metadata = MetaData.builder(currentState.metaData());
        ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
        Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion().minimumIndexCompatibilityVersion();
        for (IndexMetaData indexMetaData : indicesToOpen) {
            Index index = indexMetaData.getIndex();
            if (indexMetaData.getState() != IndexMetaData.State.OPEN) {
                Settings.Builder updatedSettings = Settings.builder().put(indexMetaData.getSettings());
                updatedSettings.remove(VERIFIED_BEFORE_CLOSE_SETTING.getKey());
                IndexMetaData updatedIndexMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.OPEN).settingsVersion(indexMetaData.getSettingsVersion() + 1L).settings(updatedSettings).build();
                updatedIndexMetaData = this.metaDataIndexUpgradeService.upgradeIndexMetaData(updatedIndexMetaData, minIndexCompatibilityVersion);
                try {
                    this.indicesService.verifyIndexMetadata(updatedIndexMetaData, updatedIndexMetaData);
                }
                catch (Exception e) {
                    throw new ElasticsearchException("Failed to verify index " + index, (Throwable)e, new Object[0]);
                }
                metadata.put(updatedIndexMetaData, true);
            }
            blocks.removeIndexBlockWithId(index.getName(), 4);
        }
        ClusterState updatedState = ClusterState.builder(currentState).metaData(metadata).blocks(blocks).build();
        RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
        for (IndexMetaData previousIndexMetaData : indicesToOpen) {
            if (previousIndexMetaData.getState() == IndexMetaData.State.OPEN) continue;
            routingTable.addAsFromCloseToOpen(updatedState.metaData().getIndexSafe(previousIndexMetaData.getIndex()));
        }
        return ClusterState.builder(updatedState).routingTable(routingTable.build()).build();
    }

    static void validateShardLimit(ClusterState currentState, Index[] indices) {
        int shardsToOpen = Arrays.stream(indices).filter(index -> currentState.metaData().index((Index)index).getState().equals((Object)IndexMetaData.State.CLOSE)).mapToInt(index -> MetaDataIndexStateService.getTotalShardCount(currentState, index)).sum();
        Optional<String> error = IndicesService.checkShardLimit(shardsToOpen, currentState);
        if (error.isPresent()) {
            ValidationException ex = new ValidationException();
            ex.addValidationError(error.get());
            throw ex;
        }
    }

    private static int getTotalShardCount(ClusterState state, Index index) {
        IndexMetaData indexMetaData = state.metaData().index(index);
        return indexMetaData.getNumberOfShards() * (1 + indexMetaData.getNumberOfReplicas());
    }

    public static ClusterBlock createIndexClosingBlock() {
        return new ClusterBlock(4, UUIDs.randomBase64UUID(), "index preparing to close. Reopen the index to allow writes again or retry closing the index to fully close the index.", false, false, false, RestStatus.FORBIDDEN, EnumSet.of(ClusterBlockLevel.WRITE));
    }

    public static boolean isIndexVerifiedBeforeClosed(IndexMetaData indexMetaData) {
        return indexMetaData.getState() == IndexMetaData.State.CLOSE && VERIFIED_BEFORE_CLOSE_SETTING.exists(indexMetaData.getSettings()) && VERIFIED_BEFORE_CLOSE_SETTING.get(indexMetaData.getSettings()) != false;
    }

    class WaitForClosedBlocksApplied
    extends AbstractRunnable {
        private final Map<Index, ClusterBlock> blockedIndices;
        private final CloseIndexClusterStateUpdateRequest request;
        private final ActionListener<Map<Index, CloseIndexResponse.IndexResult>> listener;

        private WaitForClosedBlocksApplied(Map<Index, ClusterBlock> blockedIndices, CloseIndexClusterStateUpdateRequest request, ActionListener<Map<Index, CloseIndexResponse.IndexResult>> listener) {
            if (blockedIndices == null || blockedIndices.isEmpty()) {
                throw new IllegalArgumentException("Cannot wait for closed blocks to be applied, list of blocked indices is empty or null");
            }
            this.blockedIndices = blockedIndices;
            this.request = request;
            this.listener = listener;
        }

        @Override
        public void onFailure(Exception e) {
            this.listener.onFailure(e);
        }

        @Override
        protected void doRun() throws Exception {
            ConcurrentMap results = ConcurrentCollections.newConcurrentMap();
            CountDown countDown = new CountDown(this.blockedIndices.size());
            ClusterState state = MetaDataIndexStateService.this.clusterService.state();
            this.blockedIndices.forEach((index, block) -> this.waitForShardsReadyForClosing((Index)index, (ClusterBlock)block, state, response -> {
                results.put(index, response);
                if (countDown.countDown()) {
                    this.listener.onResponse(Collections.unmodifiableMap(results));
                }
            }));
        }

        private void waitForShardsReadyForClosing(final Index index, ClusterBlock closingBlock, ClusterState state, final Consumer<CloseIndexResponse.IndexResult> onResponse) {
            IndexMetaData indexMetaData = state.metaData().index(index);
            if (indexMetaData == null) {
                logger.debug("index {} has been blocked before closing and is now deleted, ignoring", (Object)index);
                onResponse.accept(new CloseIndexResponse.IndexResult(index));
                return;
            }
            IndexRoutingTable indexRoutingTable = state.routingTable().index(index);
            if (indexRoutingTable == null || indexMetaData.getState() == IndexMetaData.State.CLOSE) {
                assert (state.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK));
                logger.debug("index {} has been blocked before closing and is already closed, ignoring", (Object)index);
                onResponse.accept(new CloseIndexResponse.IndexResult(index));
                return;
            }
            ImmutableOpenIntMap<IndexShardRoutingTable> shards = indexRoutingTable.getShards();
            final AtomicArray results = new AtomicArray(shards.size());
            final CountDown countDown = new CountDown(shards.size());
            for (IntObjectCursor<IndexShardRoutingTable> intObjectCursor : shards) {
                IndexShardRoutingTable shardRoutingTable = (IndexShardRoutingTable)intObjectCursor.value;
                final int shardId = shardRoutingTable.shardId().id();
                this.sendVerifyShardBeforeCloseRequest(shardRoutingTable, closingBlock, (ActionListener<ReplicationResponse>)new NotifyOnceListener<ReplicationResponse>(){

                    @Override
                    public void innerOnResponse(ReplicationResponse replicationResponse) {
                        CloseIndexResponse.ShardResult.Failure[] failures = (CloseIndexResponse.ShardResult.Failure[])Arrays.stream(replicationResponse.getShardInfo().getFailures()).map(f -> new CloseIndexResponse.ShardResult.Failure(f.index(), f.shardId(), f.getCause(), f.nodeId())).toArray(CloseIndexResponse.ShardResult.Failure[]::new);
                        results.setOnce(shardId, new CloseIndexResponse.ShardResult(shardId, failures));
                        this.processIfFinished();
                    }

                    @Override
                    public void innerOnFailure(Exception e) {
                        CloseIndexResponse.ShardResult.Failure failure = new CloseIndexResponse.ShardResult.Failure(index.getName(), shardId, e);
                        results.setOnce(shardId, new CloseIndexResponse.ShardResult(shardId, new CloseIndexResponse.ShardResult.Failure[]{failure}));
                        this.processIfFinished();
                    }

                    private void processIfFinished() {
                        if (countDown.countDown()) {
                            onResponse.accept(new CloseIndexResponse.IndexResult(index, results.toArray(new CloseIndexResponse.ShardResult[results.length()])));
                        }
                    }
                });
            }
        }

        private void sendVerifyShardBeforeCloseRequest(IndexShardRoutingTable shardRoutingTable, final ClusterBlock closingBlock, final ActionListener<ReplicationResponse> listener) {
            final ShardId shardId = shardRoutingTable.shardId();
            if (shardRoutingTable.primaryShard().unassigned()) {
                logger.debug("primary shard {} is unassigned, ignoring", (Object)shardId);
                ReplicationResponse response = new ReplicationResponse();
                response.setShardInfo(new ReplicationResponse.ShardInfo(shardRoutingTable.size(), shardRoutingTable.size(), new ReplicationResponse.ShardInfo.Failure[0]));
                listener.onResponse(response);
                return;
            }
            final TaskId parentTaskId = new TaskId(MetaDataIndexStateService.this.clusterService.localNode().getId(), this.request.taskId());
            TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest = new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, closingBlock, true, parentTaskId);
            if (this.request.ackTimeout() != null) {
                shardRequest.timeout(this.request.ackTimeout());
            }
            MetaDataIndexStateService.this.transportVerifyShardBeforeCloseAction.execute(shardRequest, new ActionListener<ReplicationResponse>(){

                @Override
                public void onResponse(ReplicationResponse replicationResponse) {
                    TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest = new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, closingBlock, false, parentTaskId);
                    if (WaitForClosedBlocksApplied.this.request.ackTimeout() != null) {
                        shardRequest.timeout(WaitForClosedBlocksApplied.this.request.ackTimeout());
                    }
                    MetaDataIndexStateService.this.transportVerifyShardBeforeCloseAction.execute(shardRequest, listener);
                }

                @Override
                public void onFailure(Exception e) {
                    listener.onFailure(e);
                }
            });
        }
    }
}

