/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.routing;

import java.util.function.BiFunction;
import org.apache.flink.elasticsearch7.shaded.org.apache.logging.log4j.LogManager;
import org.apache.flink.elasticsearch7.shaded.org.apache.logging.log4j.Logger;
import org.apache.flink.elasticsearch7.shaded.org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchException;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.ActionListener;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.support.PlainListenableActionFuture;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.ClusterState;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.NotMasterException;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.routing.RerouteService;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.service.ClusterService;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.Nullable;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.Priority;

public class BatchedRerouteService
implements RerouteService {
    private static final Logger logger = LogManager.getLogger(BatchedRerouteService.class);
    private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute";
    private final ClusterService clusterService;
    private final BiFunction<ClusterState, String, ClusterState> reroute;
    private final Object mutex = new Object();
    @Nullable
    private PlainListenableActionFuture<Void> pendingRerouteListeners;

    public BatchedRerouteService(ClusterService clusterService, BiFunction<ClusterState, String, ClusterState> reroute) {
        this.clusterService = clusterService;
        this.reroute = reroute;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public final void reroute(final String reason, ActionListener<Void> listener) {
        PlainListenableActionFuture<Void> currentListeners;
        Object object = this.mutex;
        synchronized (object) {
            if (this.pendingRerouteListeners != null) {
                logger.trace("already has pending reroute, adding [{}] to batch", (Object)reason);
                this.pendingRerouteListeners.addListener(listener);
                return;
            }
            currentListeners = PlainListenableActionFuture.newListenableFuture();
            currentListeners.addListener(listener);
            this.pendingRerouteListeners = currentListeners;
        }
        logger.trace("rerouting [{}]", (Object)reason);
        try {
            this.clusterService.submitStateUpdateTask("cluster_reroute(" + reason + ")", new ClusterStateUpdateTask(Priority.HIGH){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public ClusterState execute(ClusterState currentState) {
                    Object object = BatchedRerouteService.this.mutex;
                    synchronized (object) {
                        assert (BatchedRerouteService.this.pendingRerouteListeners == currentListeners);
                        BatchedRerouteService.this.pendingRerouteListeners = null;
                    }
                    return (ClusterState)BatchedRerouteService.this.reroute.apply(currentState, reason);
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onNoLongerMaster(String source) {
                    Object object = BatchedRerouteService.this.mutex;
                    synchronized (object) {
                        if (BatchedRerouteService.this.pendingRerouteListeners == currentListeners) {
                            BatchedRerouteService.this.pendingRerouteListeners = null;
                        }
                    }
                    currentListeners.onFailure(new NotMasterException("delayed reroute [" + reason + "] cancelled"));
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onFailure(String source, Exception e) {
                    Object object = BatchedRerouteService.this.mutex;
                    synchronized (object) {
                        if (BatchedRerouteService.this.pendingRerouteListeners == currentListeners) {
                            BatchedRerouteService.this.pendingRerouteListeners = null;
                        }
                    }
                    ClusterState state = BatchedRerouteService.this.clusterService.state();
                    if (logger.isTraceEnabled()) {
                        logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state:\n{}", (Object)source, (Object)state), (Throwable)e);
                    } else {
                        logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]", (Object)source, (Object)state.version()), (Throwable)e);
                    }
                    currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] failed", (Throwable)e, new Object[0]));
                }

                @Override
                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                    currentListeners.onResponse(null);
                }
            });
        }
        catch (Exception e) {
            Object object2 = this.mutex;
            synchronized (object2) {
                assert (this.pendingRerouteListeners == currentListeners);
                this.pendingRerouteListeners = null;
            }
            ClusterState state = this.clusterService.state();
            logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", (Object)state), (Throwable)e);
            currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] could not be submitted", (Throwable)e, new Object[0]));
        }
    }
}

