/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.cluster.management;

import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
import org.apache.ignite.internal.cluster.management.ClusterState;
import org.apache.ignite.internal.cluster.management.ClusterTag;
import org.apache.ignite.internal.cluster.management.InitException;
import org.apache.ignite.internal.cluster.management.LocalStateStorage;
import org.apache.ignite.internal.cluster.management.network.CmgMessageHandlerFactory;
import org.apache.ignite.internal.cluster.management.network.messages.CancelInitMessage;
import org.apache.ignite.internal.cluster.management.network.messages.ClusterStateMessage;
import org.apache.ignite.internal.cluster.management.network.messages.CmgInitMessage;
import org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup;
import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
import org.apache.ignite.internal.cluster.management.raft.CmgRaftGroupListener;
import org.apache.ignite.internal.cluster.management.raft.CmgRaftService;
import org.apache.ignite.internal.cluster.management.raft.IllegalInitArgumentException;
import org.apache.ignite.internal.cluster.management.raft.JoinDeniedException;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.TopologyEventHandler;
import org.apache.ignite.network.TopologyService;
import org.apache.ignite.network.util.ClusterServiceUtils;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class ClusterManagementGroupManager
implements IgniteComponent {
    private static final int NETWORK_INVOKE_TIMEOUT = 500;
    private static final IgniteLogger LOG = IgniteLogger.forClass(ClusterManagementGroupManager.class);
    private static final String CMG_RAFT_GROUP_NAME = "cmg_raft_group";
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final AtomicBoolean stopGuard = new AtomicBoolean();
    @Nullable
    private CompletableFuture<CmgRaftService> raftService;
    private final Object raftServiceLock = new Object();
    private final CompletableFuture<Void> joinFuture = new CompletableFuture();
    private final CmgMessagesFactory msgFactory = new CmgMessagesFactory();
    private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new NamedThreadFactory("cmg-manager"));
    private final ClusterService clusterService;
    private final Loza raftManager;
    private final ClusterStateStorage clusterStateStorage;
    private final LocalStateStorage localStateStorage;
    private final ClusterInitializer clusterInitializer;

    public ClusterManagementGroupManager(VaultManager vault, ClusterService clusterService, Loza raftManager, ClusterStateStorage clusterStateStorage) {
        this.clusterService = clusterService;
        this.raftManager = raftManager;
        this.clusterStateStorage = clusterStateStorage;
        this.localStateStorage = new LocalStateStorage(vault);
        this.clusterInitializer = new ClusterInitializer(clusterService);
    }

    public void initCluster(Collection<String> metaStorageNodeNames, Collection<String> cmgNodeNames, String clusterName) throws NodeStoppingException {
        if (!this.busyLock.enterBusy()) {
            throw new NodeStoppingException();
        }
        try {
            this.clusterInitializer.initCluster(metaStorageNodeNames, cmgNodeNames, clusterName).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new InitException("Interrupted while initializing the cluster", e);
        }
        catch (ExecutionException e) {
            throw new InitException("Unable to initialize the cluster: " + e.getCause().getMessage(), e.getCause());
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        Object object = this.raftServiceLock;
        synchronized (object) {
            this.raftService = this.recoverLocalState();
        }
        CmgMessageHandlerFactory messageHandlerFactory = new CmgMessageHandlerFactory(this.busyLock, this.msgFactory, this.clusterService);
        this.clusterService.messagingService().addMessageHandler(CmgMessageGroup.class, messageHandlerFactory.wrapHandler((message, senderAddr, correlationId) -> {
            if (message instanceof ClusterStateMessage) {
                assert (correlationId != null);
                this.handleClusterState((ClusterStateMessage)message, senderAddr, correlationId);
            } else if (message instanceof CancelInitMessage) {
                this.handleCancelInit((CancelInitMessage)message);
            } else if (message instanceof CmgInitMessage) {
                assert (correlationId != null);
                this.handleInit((CmgInitMessage)message, senderAddr, correlationId);
            }
        }));
    }

    @Nullable
    private CompletableFuture<CmgRaftService> recoverLocalState() {
        LocalStateStorage.LocalState localState;
        try {
            localState = this.localStateStorage.getLocalState().get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IgniteInternalException("Interrupted while retrieving local CMG state", (Throwable)e);
        }
        catch (ExecutionException e) {
            throw new IgniteInternalException("Error while retrieving local CMG state", (Throwable)e);
        }
        if (localState == null) {
            return null;
        }
        LOG.info("Local CMG state recovered, starting the CMG", new Object[0]);
        return this.startCmgRaftService(localState.cmgNodeNames()).thenCompose(service -> ((CompletableFuture)this.joinCluster((CmgRaftService)service, localState.clusterTag()).thenCompose(v -> service.isCurrentNodeLeader())).thenCompose(isLeader -> {
            if (!isLeader.booleanValue()) {
                return CompletableFuture.completedFuture(service);
            }
            return service.readClusterState().thenCompose(state -> {
                if (state == null) {
                    return CompletableFuture.failedFuture(new IllegalStateException("Cluster state is empty"));
                }
                return this.onLeaderElected((CmgRaftService)service, (ClusterState)state).thenApply(v -> service);
            });
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long correlationId) {
        Object object = this.raftServiceLock;
        synchronized (object) {
            if (this.raftService == null) {
                LOG.info("Init command received, starting the CMG on: {}", new Object[]{msg.cmgNodes()});
                this.raftService = this.startCmgRaftService(msg.cmgNodes());
            } else {
                LOG.info("Init command received, but the CMG has already been started", new Object[0]);
            }
            this.raftService = this.raftService.thenCompose(service -> this.doInit((CmgRaftService)service, msg).handle((v, e) -> {
                Object response;
                if (e == null) {
                    LOG.info("CMG initialized successfully", new Object[0]);
                    response = this.msgFactory.initCompleteMessage().build();
                } else {
                    if (e instanceof CompletionException) {
                        e = e.getCause();
                    }
                    LOG.error("Error when initializing the CMG: {}", e, new Object[]{e.getMessage()});
                    response = this.msgFactory.initErrorMessage().cause(e.getMessage()).shouldCancel(!(e instanceof IllegalInitArgumentException)).build();
                }
                this.clusterService.messagingService().respond(addr, (NetworkMessage)response, correlationId);
                return service;
            }));
        }
    }

    private CompletableFuture<Void> doInit(CmgRaftService service, CmgInitMessage msg) {
        return service.initClusterState(ClusterManagementGroupManager.clusterState(msg)).thenCompose(state -> {
            LocalStateStorage.LocalState localState = new LocalStateStorage.LocalState(state.cmgNodes(), state.clusterTag());
            return ((CompletableFuture)this.localStateStorage.saveLocalState(localState).thenCompose(v -> this.joinCluster(service, state.clusterTag()))).thenCompose(v -> service.isCurrentNodeLeader().thenCompose(isLeader -> {
                if (isLeader.booleanValue()) {
                    return this.onLeaderElected(service, (ClusterState)state);
                }
                return CompletableFuture.completedFuture(null);
            }));
        });
    }

    private static ClusterState clusterState(CmgInitMessage msg) {
        return new ClusterState(msg.cmgNodes(), msg.metaStorageNodes(), IgniteProductVersion.CURRENT_VERSION, new ClusterTag(msg.clusterName()));
    }

    private CompletableFuture<Void> onLeaderElected(CmgRaftService service, ClusterState state) {
        LOG.info("CMG leader has been elected, executing onLeaderElected callback", new Object[0]);
        return this.updateLogicalTopology(service).whenComplete((v, e) -> {
            if (e == null) {
                LOG.info("onLeaderElected callback executed successfully", new Object[0]);
                TopologyService topologyService = this.clusterService.topologyService();
                topologyService.addEventHandler(this.cmgLeaderTopologyEventHandler(service));
                this.sendClusterState(state, this.clusterService.topologyService().allMembers());
            } else {
                LOG.error("Error when executing onLeaderElected callback: {}", e, new Object[]{e.getMessage()});
            }
        });
    }

    private CompletableFuture<Void> updateLogicalTopology(CmgRaftService service) {
        return service.logicalTopology().thenCompose(logicalTopology -> {
            Set physicalTopologyIds = this.clusterService.topologyService().allMembers().stream().map(ClusterNode::id).collect(Collectors.toSet());
            Set<ClusterNode> nodesToRemove = logicalTopology.stream().filter(node -> !physicalTopologyIds.contains(node.id())).collect(Collectors.toUnmodifiableSet());
            return nodesToRemove.isEmpty() ? CompletableFuture.completedFuture(null) : service.removeFromCluster(nodesToRemove);
        });
    }

    private void handleCancelInit(CancelInitMessage msg) {
        LOG.info("CMG initialization cancelled, reason: " + msg.reason(), new Object[0]);
        this.destroyCmg();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void destroyCmg() {
        Object object = this.raftServiceLock;
        synchronized (object) {
            try {
                if (this.raftService != null) {
                    this.raftService.cancel(true);
                    this.raftService = null;
                }
                this.raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
                if (this.clusterStateStorage.isStarted()) {
                    this.clusterStateStorage.destroy();
                }
                this.localStateStorage.clear().get();
            }
            catch (Exception e) {
                throw new IgniteInternalException("Error when cleaning the CMG state", (Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleClusterState(ClusterStateMessage msg, NetworkAddress addr, long correlationId) {
        this.clusterService.messagingService().respond(addr, (NetworkMessage)this.msgFactory.successResponseMessage().build(), correlationId);
        ClusterState state = msg.clusterState();
        Object object = this.raftServiceLock;
        synchronized (object) {
            if (this.raftService == null) {
                LOG.info("ClusterStateMessage received, starting the CMG on {}", new Object[]{state.cmgNodes()});
                this.raftService = this.initCmgRaftService(state);
            } else {
                this.raftService = ((CompletableFuture)this.raftService.handle((service, e) -> {
                    if (service != null && service.nodeNames().equals(state.cmgNodes())) {
                        LOG.debug("ClusterStateMessage received, but the CMG service is already started", new Object[0]);
                        return CompletableFuture.completedFuture(service);
                    }
                    if (service == null) {
                        assert (e != null);
                        if (e instanceof CompletionException) {
                            e = e.getCause();
                        }
                        if (e instanceof JoinDeniedException) {
                            return CompletableFuture.failedFuture(e);
                        }
                        LOG.warn("CMG service could not be started on previous attempts: {}. Re-creating the CMG Raft service", e, new Object[]{e.getMessage()});
                    } else {
                        LOG.warn("CMG has been started on {}, but the cluster state is different: {}. Re-creating the CMG Raft service", new Object[]{service.nodeNames(), state.cmgNodes()});
                        this.destroyCmg();
                    }
                    return this.initCmgRaftService(state);
                })).thenCompose(Function.identity());
            }
        }
    }

    private CompletableFuture<Void> joinCluster(CmgRaftService service, ClusterTag clusterTag) {
        return service.startJoinCluster(clusterTag).whenComplete((v, e) -> {
            if (e == null) {
                LOG.info("Successfully joined the cluster \"{}\"", new Object[]{clusterTag.clusterName()});
                this.joinFuture.complete(null);
            } else {
                this.joinFuture.completeExceptionally((Throwable)e);
            }
        });
    }

    private CompletableFuture<CmgRaftService> startCmgRaftService(Collection<String> nodeNames) {
        List nodes = ClusterServiceUtils.resolveNodes((ClusterService)this.clusterService, nodeNames);
        try {
            return this.raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, () -> {
                this.clusterStateStorage.start();
                return new CmgRaftGroupListener(this.clusterStateStorage);
            }).thenApply(service -> new CmgRaftService((RaftGroupService)service, this.clusterService));
        }
        catch (NodeStoppingException e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    private CompletableFuture<CmgRaftService> initCmgRaftService(ClusterState state) {
        return this.startCmgRaftService(state.cmgNodes()).thenCompose(service -> {
            LocalStateStorage.LocalState localState = new LocalStateStorage.LocalState(state.cmgNodes(), state.clusterTag());
            return ((CompletableFuture)this.localStateStorage.saveLocalState(localState).thenCompose(v -> this.joinCluster((CmgRaftService)service, state.clusterTag()))).thenApply(v -> service);
        });
    }

    private TopologyEventHandler cmgLeaderTopologyEventHandler(final CmgRaftService raftService) {
        return new TopologyEventHandler(){

            public void onAppeared(ClusterNode member) {
                raftService.readClusterState().thenAccept(state -> {
                    if (state != null) {
                        ClusterManagementGroupManager.this.sendClusterState((ClusterState)state, member).whenComplete((v, e) -> {
                            if (e != null) {
                                LOG.warn("Error when sending ClusterState: {}", e, new Object[]{e.getMessage()});
                            }
                        });
                    } else {
                        LOG.info("Cannot send the cluster state to a newly added node {} because cluster state is empty", new Object[]{member});
                    }
                });
            }

            public void onDisappeared(ClusterNode member) {
                ClusterManagementGroupManager.this.scheduleRemoveFromLogicalTopology(raftService, member);
            }
        };
    }

    private void scheduleRemoveFromLogicalTopology(CmgRaftService raftService, ClusterNode node) {
        this.scheduledExecutor.schedule(() -> {
            ClusterNode physicalTopologyNode = this.clusterService.topologyService().getByConsistentId(node.name());
            if (physicalTopologyNode == null || !physicalTopologyNode.id().equals(node.id())) {
                raftService.removeFromCluster(Set.of(node));
            }
        }, 0L, TimeUnit.MILLISECONDS);
    }

    private CompletableFuture<Void> sendClusterState(ClusterState clusterState, ClusterNode node) {
        ClusterStateMessage msg = this.msgFactory.clusterStateMessage().clusterState(clusterState).build();
        return this.sendWithRetry(node, msg);
    }

    private CompletableFuture<Void> sendClusterState(ClusterState clusterState, Collection<ClusterNode> nodes) {
        ClusterStateMessage msg = this.msgFactory.clusterStateMessage().clusterState(clusterState).build();
        CompletableFuture[] futures = (CompletableFuture[])nodes.stream().map(node -> this.sendWithRetry((ClusterNode)node, msg)).toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(futures);
    }

    private CompletableFuture<Void> sendWithRetry(ClusterNode node, NetworkMessage msg) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        this.sendWithRetry(node, msg, result, 5);
        return result.whenComplete((v, e) -> {
            if (e != null) {
                LOG.warn("Unable to send message {} to {}", e, new Object[]{msg.getClass(), node});
            }
        });
    }

    private void sendWithRetry(ClusterNode node, NetworkMessage msg, CompletableFuture<Void> result, int attempts) {
        this.clusterService.messagingService().invoke(node, msg, 500L).whenComplete((response, e) -> {
            if (e == null) {
                result.complete(null);
            } else if (attempts == 1) {
                result.completeExceptionally((Throwable)e);
            } else {
                LOG.debug("Exception when sending message to {}, retrying", e, new Object[]{node.name()});
                this.scheduledExecutor.schedule(() -> this.sendWithRetry(node, msg, result, attempts - 1), 500L, TimeUnit.MILLISECONDS);
            }
        });
    }

    public void stop() throws Exception {
        if (!this.stopGuard.compareAndSet(false, true)) {
            return;
        }
        this.busyLock.block();
        IgniteUtils.shutdownAndAwaitTermination((ExecutorService)this.scheduledExecutor, (long)10L, (TimeUnit)TimeUnit.SECONDS);
        IgniteUtils.closeAll((AutoCloseable[])new AutoCloseable[]{() -> this.raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME), this.clusterStateStorage});
        this.joinFuture.completeExceptionally(new NodeStoppingException());
    }

    public CompletableFuture<Void> joinFuture() {
        if (!this.busyLock.enterBusy()) {
            return CompletableFuture.failedFuture(new NodeStoppingException());
        }
        try {
            CompletableFuture<Void> completableFuture = this.joinFuture;
            return completableFuture;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    public CompletableFuture<Collection<String>> metaStorageNodes() {
        if (!this.busyLock.enterBusy()) {
            return CompletableFuture.failedFuture(new NodeStoppingException());
        }
        try {
            CompletionStage completionStage = ((CompletableFuture)this.raftServiceAfterJoin().thenCompose(CmgRaftService::readClusterState)).thenApply(ClusterState::metaStorageNodes);
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    public CompletableFuture<Collection<ClusterNode>> logicalTopology() {
        if (!this.busyLock.enterBusy()) {
            return CompletableFuture.failedFuture(new NodeStoppingException());
        }
        try {
            CompletionStage completionStage = this.raftServiceAfterJoin().thenCompose(CmgRaftService::logicalTopology);
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    public CompletableFuture<Void> onJoinReady() {
        if (!this.busyLock.enterBusy()) {
            return CompletableFuture.failedFuture(new NodeStoppingException());
        }
        try {
            CompletionStage completionStage = this.raftServiceAfterJoin().thenCompose(CmgRaftService::completeJoinCluster);
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    @TestOnly
    public CompletableFuture<Boolean> isCmgLeader() {
        if (!this.busyLock.enterBusy()) {
            return CompletableFuture.failedFuture(new NodeStoppingException());
        }
        try {
            CompletionStage completionStage = this.raftServiceAfterJoin().thenCompose(CmgRaftService::isCurrentNodeLeader);
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private CompletableFuture<CmgRaftService> raftServiceAfterJoin() {
        return this.joinFuture.thenCompose(v -> {
            Object object = this.raftServiceLock;
            synchronized (object) {
                assert (this.raftService != null);
                return this.raftService;
            }
        });
    }
}

