/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.cp.internal;

import com.hazelcast.cp.CPGroup;
import com.hazelcast.cp.CPGroupId;
import com.hazelcast.cp.CPMember;
import com.hazelcast.cp.exception.CPGroupDestroyedException;
import com.hazelcast.cp.internal.CPGroupSummary;
import com.hazelcast.cp.internal.CPMemberInfo;
import com.hazelcast.cp.internal.MembershipChangeSchedule;
import com.hazelcast.cp.internal.RaftGroupId;
import com.hazelcast.cp.internal.RaftInvocationManager;
import com.hazelcast.cp.internal.RaftOp;
import com.hazelcast.cp.internal.RaftService;
import com.hazelcast.cp.internal.operation.GetLeadedGroupsOp;
import com.hazelcast.cp.internal.operation.TransferLeadershipOp;
import com.hazelcast.cp.internal.raft.MembershipChangeMode;
import com.hazelcast.cp.internal.raft.QueryPolicy;
import com.hazelcast.cp.internal.raft.exception.MismatchingGroupMembersCommitIndexException;
import com.hazelcast.cp.internal.raft.impl.RaftEndpoint;
import com.hazelcast.cp.internal.raft.impl.RaftNode;
import com.hazelcast.cp.internal.raft.impl.RaftNodeStatus;
import com.hazelcast.cp.internal.raftop.metadata.CompleteDestroyRaftGroupsOp;
import com.hazelcast.cp.internal.raftop.metadata.CompleteRaftGroupMembershipChangesOp;
import com.hazelcast.cp.internal.raftop.metadata.GetActiveCPMembersOp;
import com.hazelcast.cp.internal.raftop.metadata.GetActiveRaftGroupIdsOp;
import com.hazelcast.cp.internal.raftop.metadata.GetDestroyingRaftGroupIdsOp;
import com.hazelcast.cp.internal.raftop.metadata.GetMembershipChangeScheduleOp;
import com.hazelcast.cp.internal.raftop.metadata.GetRaftGroupOp;
import com.hazelcast.internal.util.BiTuple;
import com.hazelcast.internal.util.ExceptionUtil;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.InternalCompletableFuture;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.executionservice.ExecutionService;
import com.hazelcast.spi.impl.operationservice.OperationService;
import com.hazelcast.spi.properties.HazelcastProperty;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;

class RaftGroupMembershipManager {
    static final long MANAGEMENT_TASK_PERIOD_IN_MILLIS = TimeUnit.SECONDS.toMillis(1L);
    static final HazelcastProperty LEADERSHIP_BALANCE_TASK_PERIOD = new HazelcastProperty("hazelcast.raft.leadership.rebalance.period", 60);
    private static final long CHECK_LOCAL_RAFT_NODES_TASK_PERIOD = 10L;
    private final NodeEngine nodeEngine;
    private final RaftService raftService;
    private final ILogger logger;
    private final RaftInvocationManager invocationManager;
    private final AtomicBoolean initialized = new AtomicBoolean();

    RaftGroupMembershipManager(NodeEngine nodeEngine, RaftService raftService) {
        this.nodeEngine = nodeEngine;
        this.logger = nodeEngine.getLogger(this.getClass());
        this.raftService = raftService;
        this.invocationManager = raftService.getInvocationManager();
    }

    void init() {
        if (this.raftService.getLocalCPMember() == null || !this.initialized.compareAndSet(false, true)) {
            return;
        }
        ExecutionService executionService = this.nodeEngine.getExecutionService();
        executionService.scheduleWithRepetition("hz:cpSubsystemManagement", new RaftGroupDestroyHandlerTask(), MANAGEMENT_TASK_PERIOD_IN_MILLIS, MANAGEMENT_TASK_PERIOD_IN_MILLIS, TimeUnit.MILLISECONDS);
        executionService.scheduleWithRepetition("hz:cpSubsystemManagement", new RaftGroupMembershipChangeHandlerTask(), MANAGEMENT_TASK_PERIOD_IN_MILLIS, MANAGEMENT_TASK_PERIOD_IN_MILLIS, TimeUnit.MILLISECONDS);
        executionService.scheduleWithRepetition("hz:cpSubsystemManagement", new CheckLocalRaftNodesTask(), 10L, 10L, TimeUnit.SECONDS);
        int leadershipRebalancePeriod = this.nodeEngine.getProperties().getInteger(LEADERSHIP_BALANCE_TASK_PERIOD);
        executionService.scheduleWithRepetition("hz:cpSubsystemManagement", new RaftGroupLeadershipBalanceTask(), leadershipRebalancePeriod, leadershipRebalancePeriod, TimeUnit.SECONDS);
    }

    private boolean skipRunningTask() {
        return !this.raftService.isDiscoveryCompleted() || !this.raftService.isStartCompleted() || !this.raftService.getMetadataGroupManager().isMetadataGroupLeader();
    }

    void rebalanceGroupLeaderships() {
        new RaftGroupLeadershipBalanceTask().run();
    }

    private <T> InternalCompletableFuture<T> queryMetadata(RaftOp op) {
        return this.invocationManager.query(this.raftService.getMetadataGroupId(), op, QueryPolicy.LEADER_LOCAL);
    }

    private final class RaftGroupLeadershipBalanceTask
    implements Runnable {
        private RaftGroupLeadershipBalanceTask() {
        }

        @Override
        public void run() {
            if (RaftGroupMembershipManager.this.skipRunningTask()) {
                return;
            }
            try {
                this.rebalanceLeaderships();
            }
            catch (Exception e) {
                if (RaftGroupMembershipManager.this.logger.isFineEnabled()) {
                    RaftGroupMembershipManager.this.logger.warning("Cannot execute leadership rebalance at the moment", e);
                }
                RaftGroupMembershipManager.this.logger.info("Cannot execute leadership rebalance at the moment: " + e.getClass().getName() + ": " + e.getMessage());
            }
        }

        private void rebalanceLeaderships() {
            Collection<CPGroupId> groupIds;
            List<CPGroupSummary> cpGroupSummaries;
            Collection<CPMember> allMembers = this.getMembers().values();
            Map<CPMember, Map.Entry<Integer, Collection<CPGroupId>>> leadershipPriorityMap = this.getLeadershipPriorityMap(allMembers);
            Set<CPMember> priorityCPMembers = this.rebalanceLeadershipsByPriority(leadershipPriorityMap, cpGroupSummaries = (groupIds = this.getCpGroupIds()).stream().map(this::getCpGroup).collect(Collectors.toList()));
            if (priorityCPMembers.size() > 1) {
                this.rebalanceLeadershipsEvenly(priorityCPMembers, cpGroupSummaries, allMembers);
            }
        }

        private Map<CPMember, Map.Entry<Integer, Collection<CPGroupId>>> getLeadershipPriorityMap(Collection<CPMember> members) {
            HashMap<CPMember, Map.Entry<Integer, Collection<CPGroupId>>> leadershipsWithPriorities = new HashMap<CPMember, Map.Entry<Integer, Collection<CPGroupId>>>();
            OperationService operationService = RaftGroupMembershipManager.this.nodeEngine.getOperationService();
            StringBuilder s2 = new StringBuilder("Current leadership claims:");
            for (CPMember member : members) {
                try {
                    Map.Entry entry = (Map.Entry)operationService.invokeOnTarget("hz:core:raft", new GetLeadedGroupsOp(), member.getAddress()).join();
                    leadershipsWithPriorities.put(member, entry);
                    int priority = (Integer)entry.getKey();
                    Collection groups2 = (Collection)entry.getValue();
                    if (RaftGroupMembershipManager.this.logger.isFineEnabled()) {
                        RaftGroupMembershipManager.this.logger.fine(member + " claims it's leader of " + groups2.size() + " groups: " + groups2);
                    }
                    s2.append('\n').append('\t').append(member).append(" priority ").append(priority).append(" has ").append(groups2.size()).append(",");
                }
                catch (Exception e) {
                    RaftGroupMembershipManager.this.logger.info("Skipped " + member + " for leadership rebalancing due to " + e);
                }
            }
            s2.append(" leaderships.");
            RaftGroupMembershipManager.this.logger.info(s2.toString());
            return leadershipsWithPriorities;
        }

        private Set<CPMember> rebalanceLeadershipsByPriority(Map<CPMember, Map.Entry<Integer, Collection<CPGroupId>>> leadershipPriorityMap, List<CPGroupSummary> cpGroupSummaries) {
            HashMap<CPGroupId, List<CPMember>> priorityMembersInAGroup = new HashMap<CPGroupId, List<CPMember>>();
            HashSet<Integer> maxPriorities = new HashSet<Integer>();
            for (CPGroupSummary cpGroupSummary : cpGroupSummaries) {
                Collection<CPMember> groupMembers = cpGroupSummary.members();
                Map.Entry<Integer, List<CPMember>> priorityMembersInAGroupEntry = this.getPriorityMembersInAGroup(leadershipPriorityMap, groupMembers);
                int priority = priorityMembersInAGroupEntry.getKey();
                List<CPMember> priorityMembers = priorityMembersInAGroupEntry.getValue();
                maxPriorities.add(priority);
                priorityMembersInAGroup.put(cpGroupSummary.id(), priorityMembers);
            }
            Map partitionedMembersByPriority = leadershipPriorityMap.entrySet().stream().collect(Collectors.partitioningBy(e -> maxPriorities.contains(((Map.Entry)e.getValue()).getKey()), Collectors.mapping(Map.Entry::getKey, Collectors.toSet())));
            Set fromCPMembers = partitionedMembersByPriority.get(false);
            Set<CPMember> allPriorityMembers = partitionedMembersByPriority.get(true);
            int roundRobinCounter = 0;
            for (CPMember fromCPMember : fromCPMembers) {
                Collection<CPGroupId> leadedGroups = leadershipPriorityMap.get(fromCPMember).getValue();
                for (CPGroupId cpGroupId : leadedGroups) {
                    List priorityMembers = (List)priorityMembersInAGroup.get(cpGroupId);
                    int roundRobinNumber = roundRobinCounter++ % priorityMembers.size();
                    CPMember toCPMember = (CPMember)priorityMembers.get(roundRobinNumber);
                    RaftGroupMembershipManager.this.logger.info("Transferring leadership for " + cpGroupId.getName() + " group: from " + fromCPMember + " with priority " + leadershipPriorityMap.get(fromCPMember).getKey() + " to " + toCPMember + " with priority " + leadershipPriorityMap.get(toCPMember).getKey());
                    if (this.transferLeadership(fromCPMember, toCPMember, cpGroupId)) continue;
                    return Collections.EMPTY_SET;
                }
            }
            return allPriorityMembers;
        }

        private Map.Entry<Integer, List<CPMember>> getPriorityMembersInAGroup(Map<CPMember, Map.Entry<Integer, Collection<CPGroupId>>> leadershipPriorityMap, Collection<CPMember> cpGroupMembers) {
            return cpGroupMembers.stream().flatMap(m3 -> leadershipPriorityMap.containsKey(m3) ? Stream.of(BiTuple.of(((Map.Entry)leadershipPriorityMap.get(m3)).getKey(), m3)) : Stream.empty()).collect(Collectors.groupingBy(BiTuple::element1, TreeMap::new, Collectors.mapping(BiTuple::element2, Collectors.toList()))).lastEntry();
        }

        private void rebalanceLeadershipsEvenly(Set<CPMember> priorityMembers, Collection<CPGroupSummary> cpGroupSummaries, Collection<CPMember> allMembers) {
            int avgGroupsPerMember = cpGroupSummaries.size() / priorityMembers.size();
            boolean overAvgAllowed = cpGroupSummaries.size() % priorityMembers.size() != 0;
            RaftGroupMembershipManager.this.logger.fine("Searching for leadership imbalance in " + cpGroupSummaries.size() + " CPGroups, average groups per member is " + avgGroupsPerMember);
            HashSet<CPMember> handledMembers = new HashSet<CPMember>(priorityMembers.size());
            while (true) {
                Map<CPMember, Map.Entry<Integer, Collection<CPGroupId>>> leadershipPriorityMap = this.getLeadershipPriorityMap(allMembers);
                Map<CPMember, Collection<CPGroupId>> leaderships = leadershipPriorityMap.entrySet().stream().filter(e -> priorityMembers.contains(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey, v -> (Collection)((Map.Entry)v.getValue()).getValue()));
                BiTuple<CPMember, Integer> from = this.getEndpointWithMaxLeaderships(leaderships, avgGroupsPerMember, handledMembers);
                if (from.element1 == null) {
                    RaftGroupMembershipManager.this.logger.info("CPGroup leadership balance is fine, cannot rebalance further...");
                    return;
                }
                RaftGroupMembershipManager.this.logger.info("Searching a candidate transfer leadership from " + from.element1 + " with " + from.element2 + " leaderships.");
                Collection<CPGroupSummary> groups2 = this.getLeaderGroupsOf((CPMember)from.element1, leaderships.get(from.element1), cpGroupSummaries);
                int maxLeaderships = overAvgAllowed ? ((Integer)from.element2 > avgGroupsPerMember + 1 ? avgGroupsPerMember : avgGroupsPerMember - 1) : avgGroupsPerMember;
                BiTuple<CPMember, CPGroupId> to = this.getEndpointWithMinLeaderships(groups2, leaderships, maxLeaderships);
                if (to.element1 == null) {
                    RaftGroupMembershipManager.this.logger.info("No candidate could be found to get leadership from " + from.element1 + ". Skipping to next...");
                    handledMembers.add((CPMember)from.element1);
                    continue;
                }
                if (!this.transferLeadership((CPMember)from.element1, (CPMember)to.element1, (CPGroupId)to.element2)) break;
            }
        }

        private Collection<CPGroupSummary> getLeaderGroupsOf(CPMember member, Collection<CPGroupId> leaderships, Collection<CPGroupSummary> groups2) {
            ArrayList<CPGroupSummary> memberGroups = new ArrayList<CPGroupSummary>();
            for (CPGroupSummary group : groups2) {
                if ("METADATA".equals(group.id().getName()) || !leaderships.contains(group.id()) || !group.members().contains(member)) continue;
                memberGroups.add(group);
            }
            return memberGroups;
        }

        private BiTuple<CPMember, CPGroupId> getEndpointWithMinLeaderships(Collection<CPGroupSummary> groups2, Map<CPMember, Collection<CPGroupId>> leaderships, int maxLeaderships) {
            CPMember to = null;
            CPGroupId groupId = null;
            int min2 = maxLeaderships;
            for (CPGroupSummary group : groups2) {
                for (CPMember member : group.members()) {
                    int k;
                    Collection<CPGroupId> g2 = leaderships.get(member);
                    if (g2 == null || (k = g2.size()) > min2) continue;
                    min2 = k;
                    to = member;
                    groupId = group.id();
                }
            }
            return BiTuple.of(to, groupId);
        }

        private BiTuple<CPMember, Integer> getEndpointWithMaxLeaderships(Map<CPMember, Collection<CPGroupId>> leaderships, int minLeaderships, Set<CPMember> excludeSet) {
            CPMember from = null;
            int max = minLeaderships;
            for (Map.Entry<CPMember, Collection<CPGroupId>> entry : leaderships.entrySet()) {
                int count;
                if (excludeSet.contains(entry.getKey()) || (count = entry.getValue().size()) <= max) continue;
                from = entry.getKey();
                max = count;
            }
            return BiTuple.of(from, max);
        }

        private boolean transferLeadership(CPMember from, CPMember to, CPGroupId groupId) {
            RaftGroupMembershipManager.this.logger.info("Transferring leadership from " + from + " to " + to + " in " + groupId);
            try {
                RaftGroupMembershipManager.this.nodeEngine.getOperationService().invokeOnTarget(null, new TransferLeadershipOp(groupId, to), from.getAddress()).join();
                return true;
            }
            catch (Exception e) {
                RaftGroupMembershipManager.this.logger.warning(e);
                return false;
            }
        }

        private Map<RaftEndpoint, CPMember> getMembers() {
            InternalCompletableFuture future = RaftGroupMembershipManager.this.queryMetadata(new GetActiveCPMembersOp());
            Collection members = (Collection)future.join();
            HashMap<RaftEndpoint, CPMember> map = new HashMap<RaftEndpoint, CPMember>(members.size());
            for (CPMemberInfo member : members) {
                map.put(member.toRaftEndpoint(), member);
            }
            return map;
        }

        private CPGroupSummary getCpGroup(CPGroupId groupId) {
            InternalCompletableFuture f = RaftGroupMembershipManager.this.queryMetadata(new GetRaftGroupOp(groupId));
            return (CPGroupSummary)f.join();
        }

        private Collection<CPGroupId> getCpGroupIds() {
            InternalCompletableFuture future = RaftGroupMembershipManager.this.queryMetadata(new GetActiveRaftGroupIdsOp());
            Collection groupIds = (Collection)future.join();
            return groupIds;
        }
    }

    private class RaftGroupMembershipChangeHandlerTask
    implements Runnable {
        private static final int NA_MEMBERS_COMMIT_INDEX = -1;

        private RaftGroupMembershipChangeHandlerTask() {
        }

        @Override
        public void run() {
            if (RaftGroupMembershipManager.this.skipRunningTask()) {
                return;
            }
            MembershipChangeSchedule schedule2 = this.getMembershipChangeSchedule();
            if (schedule2 == null) {
                return;
            }
            if (RaftGroupMembershipManager.this.logger.isFineEnabled()) {
                RaftGroupMembershipManager.this.logger.fine("Handling " + schedule2);
            }
            List<MembershipChangeSchedule.CPGroupMembershipChange> changes = schedule2.getChanges();
            CountDownLatch latch = new CountDownLatch(changes.size());
            ConcurrentHashMap<CPGroupId, BiTuple<Long, Long>> changedGroups = new ConcurrentHashMap<CPGroupId, BiTuple<Long, Long>>();
            for (MembershipChangeSchedule.CPGroupMembershipChange change : changes) {
                this.applyOnRaftGroup(latch, changedGroups, change);
            }
            try {
                latch.await();
                this.completeMembershipChanges(changedGroups);
            }
            catch (InterruptedException e) {
                RaftGroupMembershipManager.this.logger.warning("Membership changes interrupted while executing " + schedule2 + ". completed: " + changedGroups, e);
                Thread.currentThread().interrupt();
            }
        }

        private MembershipChangeSchedule getMembershipChangeSchedule() {
            InternalCompletableFuture f = RaftGroupMembershipManager.this.queryMetadata(new GetMembershipChangeScheduleOp());
            return (MembershipChangeSchedule)f.joinInternal();
        }

        private void applyOnRaftGroup(CountDownLatch latch, Map<CPGroupId, BiTuple<Long, Long>> changedGroups, MembershipChangeSchedule.CPGroupMembershipChange change) {
            CompletableFuture future = change.getMemberToRemove() != null ? RaftGroupMembershipManager.this.invocationManager.changeMembership(change.getGroupId(), change.getMembersCommitIndex(), change.getMemberToRemove(), MembershipChangeMode.REMOVE) : CompletableFuture.completedFuture(change.getMembersCommitIndex());
            future.whenCompleteAsync((removeCommitIndex, t) -> {
                if (t == null) {
                    if (change.getMemberToAdd() != null) {
                        this.addMember(latch, changedGroups, change, (long)removeCommitIndex);
                    } else {
                        changedGroups.put(change.getGroupId(), BiTuple.of(change.getMembersCommitIndex(), removeCommitIndex));
                        latch.countDown();
                    }
                } else {
                    long commitIndex = this.checkMemberRemoveCommitIndex(changedGroups, change, (Throwable)t);
                    if (commitIndex != -1L) {
                        if (change.getMemberToAdd() != null) {
                            this.addMember(latch, changedGroups, change, commitIndex);
                        } else {
                            changedGroups.put(change.getGroupId(), BiTuple.of(change.getMembersCommitIndex(), commitIndex));
                            latch.countDown();
                        }
                    } else {
                        latch.countDown();
                    }
                }
            });
        }

        private void addMember(CountDownLatch latch, Map<CPGroupId, BiTuple<Long, Long>> changedGroups, MembershipChangeSchedule.CPGroupMembershipChange change, long currentCommitIndex) {
            InternalCompletableFuture future = RaftGroupMembershipManager.this.invocationManager.changeMembership(change.getGroupId(), currentCommitIndex, change.getMemberToAdd(), MembershipChangeMode.ADD);
            ((CompletableFuture)future).whenCompleteAsync((addCommitIndex, t) -> {
                if (t == null) {
                    changedGroups.put(change.getGroupId(), BiTuple.of(change.getMembersCommitIndex(), addCommitIndex));
                    latch.countDown();
                } else {
                    this.checkMemberAddCommitIndex(changedGroups, change, (Throwable)t);
                    latch.countDown();
                }
            });
        }

        private void checkMemberAddCommitIndex(Map<CPGroupId, BiTuple<Long, Long>> changedGroups, MembershipChangeSchedule.CPGroupMembershipChange change, Throwable t) {
            RaftEndpoint memberToAdd = change.getMemberToAdd();
            if (t instanceof MismatchingGroupMembersCommitIndexException) {
                MismatchingGroupMembersCommitIndexException m3 = (MismatchingGroupMembersCommitIndexException)t;
                String msg = "MEMBER ADD commit of " + change + " failed. Actual group members: " + m3.getMembers() + " with commit index: " + m3.getCommitIndex();
                if (!m3.getMembers().contains(memberToAdd)) {
                    RaftGroupMembershipManager.this.logger.severe(msg);
                    return;
                }
                if (change.getMemberToRemove() != null) {
                    if (m3.getMembers().contains(change.getMemberToRemove())) {
                        RaftGroupMembershipManager.this.logger.severe(msg);
                        return;
                    }
                    if (m3.getMembers().size() != change.getMembers().size()) {
                        RaftGroupMembershipManager.this.logger.severe(msg);
                        return;
                    }
                } else if (m3.getMembers().size() != change.getMembers().size() + 1) {
                    RaftGroupMembershipManager.this.logger.severe(msg);
                    return;
                }
                for (RaftEndpoint member : change.getMembers()) {
                    if (member.equals(change.getMemberToRemove()) || m3.getMembers().contains(member)) continue;
                    RaftGroupMembershipManager.this.logger.severe(msg);
                    return;
                }
                changedGroups.put(change.getGroupId(), BiTuple.of(change.getMembersCommitIndex(), m3.getCommitIndex()));
                return;
            }
            RaftGroupMembershipManager.this.logger.severe("Cannot get MEMBER ADD result of " + memberToAdd + " to " + change.getGroupId() + " with members commit index: " + change.getMembersCommitIndex(), t);
        }

        private long checkMemberRemoveCommitIndex(Map<CPGroupId, BiTuple<Long, Long>> changedGroups, MembershipChangeSchedule.CPGroupMembershipChange change, Throwable t) {
            RaftEndpoint removedMember = change.getMemberToRemove();
            if (t instanceof MismatchingGroupMembersCommitIndexException) {
                MismatchingGroupMembersCommitIndexException m3 = (MismatchingGroupMembersCommitIndexException)t;
                String msg = "MEMBER REMOVE commit of " + change + " failed. Actual group members: " + m3.getMembers() + " with commit index: " + m3.getCommitIndex();
                if (m3.getMembers().contains(removedMember)) {
                    RaftGroupMembershipManager.this.logger.severe(msg);
                    return -1L;
                }
                if (change.getMemberToAdd() != null && m3.getMembers().contains(change.getMemberToAdd())) {
                    if (m3.getMembers().size() != change.getMembers().size()) {
                        RaftGroupMembershipManager.this.logger.severe(msg);
                        return -1L;
                    }
                    for (RaftEndpoint member : change.getMembers()) {
                        if (member.equals(removedMember) || m3.getMembers().contains(member)) continue;
                        RaftGroupMembershipManager.this.logger.severe(msg);
                        return -1L;
                    }
                    changedGroups.put(change.getGroupId(), BiTuple.of(change.getMembersCommitIndex(), m3.getCommitIndex()));
                    return -1L;
                }
                if (m3.getMembers().size() != change.getMembers().size() - 1) {
                    RaftGroupMembershipManager.this.logger.severe(msg);
                    return -1L;
                }
                for (RaftEndpoint member : change.getMembers()) {
                    if (member.equals(removedMember) || m3.getMembers().contains(member)) continue;
                    RaftGroupMembershipManager.this.logger.severe(msg);
                    return -1L;
                }
                return m3.getCommitIndex();
            }
            RaftGroupMembershipManager.this.logger.severe("Cannot get MEMBER REMOVE result of " + removedMember + " to " + change.getGroupId(), t);
            return -1L;
        }

        private void completeMembershipChanges(Map<CPGroupId, BiTuple<Long, Long>> changedGroups) {
            CompleteRaftGroupMembershipChangesOp op = new CompleteRaftGroupMembershipChangesOp(changedGroups);
            RaftGroupId metadataGroupId = RaftGroupMembershipManager.this.raftService.getMetadataGroupId();
            InternalCompletableFuture future = RaftGroupMembershipManager.this.invocationManager.invoke(metadataGroupId, op);
            try {
                future.get();
            }
            catch (Exception e) {
                RaftGroupMembershipManager.this.logger.severe("Cannot commit CP group membership changes: " + changedGroups, e);
            }
        }
    }

    private class RaftGroupDestroyHandlerTask
    implements Runnable {
        private RaftGroupDestroyHandlerTask() {
        }

        @Override
        public void run() {
            if (RaftGroupMembershipManager.this.skipRunningTask()) {
                return;
            }
            Set<CPGroupId> destroyedGroupIds = this.destroyRaftGroups();
            if (destroyedGroupIds.isEmpty()) {
                return;
            }
            this.commitDestroyedRaftGroups(destroyedGroupIds);
        }

        private Set<CPGroupId> destroyRaftGroups() {
            Collection<CPGroupId> destroyingRaftGroupIds = this.getDestroyingRaftGroupIds();
            if (destroyingRaftGroupIds.isEmpty()) {
                return Collections.emptySet();
            }
            HashMap<CPGroupId, InternalCompletableFuture<Object>> futures = new HashMap<CPGroupId, InternalCompletableFuture<Object>>();
            for (CPGroupId groupId : destroyingRaftGroupIds) {
                InternalCompletableFuture<Object> future = RaftGroupMembershipManager.this.invocationManager.destroy(groupId);
                futures.put(groupId, future);
            }
            HashSet<CPGroupId> destroyedGroupIds = new HashSet<CPGroupId>();
            for (Map.Entry e : futures.entrySet()) {
                if (!this.isRaftGroupDestroyed((CPGroupId)e.getKey(), (Future)e.getValue())) continue;
                destroyedGroupIds.add((CPGroupId)e.getKey());
            }
            return destroyedGroupIds;
        }

        private Collection<CPGroupId> getDestroyingRaftGroupIds() {
            InternalCompletableFuture f = RaftGroupMembershipManager.this.queryMetadata(new GetDestroyingRaftGroupIdsOp());
            return (Collection)f.joinInternal();
        }

        private boolean isRaftGroupDestroyed(CPGroupId groupId, Future<Object> future) {
            try {
                future.get();
                return true;
            }
            catch (InterruptedException e) {
                RaftGroupMembershipManager.this.logger.severe("Cannot get result of DESTROY commit to " + groupId, e);
                return false;
            }
            catch (ExecutionException e) {
                if (ExceptionUtil.peel(e) instanceof CPGroupDestroyedException) {
                    return true;
                }
                RaftGroupMembershipManager.this.logger.severe("Cannot get result of DESTROY commit to " + groupId, e);
                return false;
            }
        }

        private void commitDestroyedRaftGroups(Set<CPGroupId> destroyedGroupIds) {
            CompleteDestroyRaftGroupsOp op = new CompleteDestroyRaftGroupsOp(destroyedGroupIds);
            RaftGroupId metadataGroupId = RaftGroupMembershipManager.this.raftService.getMetadataGroupId();
            InternalCompletableFuture f = RaftGroupMembershipManager.this.invocationManager.invoke(metadataGroupId, op);
            try {
                f.get();
                RaftGroupMembershipManager.this.logger.info("Terminated CP groups: " + destroyedGroupIds + " are committed.");
            }
            catch (Exception e) {
                RaftGroupMembershipManager.this.logger.severe("Cannot commit terminated CP groups: " + destroyedGroupIds, e);
            }
        }
    }

    private class CheckLocalRaftNodesTask
    implements Runnable {
        private CheckLocalRaftNodesTask() {
        }

        @Override
        public void run() {
            if (!RaftGroupMembershipManager.this.raftService.isDiscoveryCompleted() || !RaftGroupMembershipManager.this.raftService.isStartCompleted()) {
                return;
            }
            for (RaftNode raftNode : RaftGroupMembershipManager.this.raftService.getAllRaftNodes()) {
                CPGroupId groupId = raftNode.getGroupId();
                if (groupId.equals(RaftGroupMembershipManager.this.raftService.getMetadataGroupId())) continue;
                if (raftNode.getStatus() == RaftNodeStatus.TERMINATED) {
                    RaftGroupMembershipManager.this.raftService.terminateRaftNode(groupId, false);
                    continue;
                }
                if (raftNode.getStatus() == RaftNodeStatus.STEPPED_DOWN) {
                    RaftGroupMembershipManager.this.raftService.stepDownRaftNode(groupId);
                    continue;
                }
                InternalCompletableFuture f = RaftGroupMembershipManager.this.queryMetadata(new GetRaftGroupOp(groupId));
                ((CompletableFuture)f).whenCompleteAsync((group, t) -> {
                    if (t == null) {
                        if (group == null) {
                            RaftGroupMembershipManager.this.logger.severe("Could not find CP group for local raft node of " + groupId);
                        } else if (group.status() == CPGroup.CPGroupStatus.DESTROYED) {
                            RaftGroupMembershipManager.this.raftService.terminateRaftNode(groupId, true);
                        }
                    } else {
                        RaftGroupMembershipManager.this.logger.warning("Could not get CP group info of " + groupId, (Throwable)t);
                    }
                });
            }
        }
    }
}

