/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.services.datacluster.dynamic;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.remoting.base.ServiceDescription;
import org.nustaq.kontraktor.services.ClusterCfg;
import org.nustaq.kontraktor.services.RegistryArgs;
import org.nustaq.kontraktor.services.ServiceRegistry;
import org.nustaq.kontraktor.services.datacluster.dynamic.AssignMappingAction;
import org.nustaq.kontraktor.services.datacluster.dynamic.DynDataShard;
import org.nustaq.kontraktor.services.datacluster.dynamic.MoveHashShardsAction;
import org.nustaq.kontraktor.services.rlserver.SingleProcessRLClusterArgs;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.kontraktor.util.Pair;
import org.nustaq.reallive.api.TableState;
import org.nustaq.reallive.server.dynamic.DynClusterDistribution;
import org.nustaq.reallive.server.dynamic.DynClusterTableDistribution;
import org.nustaq.reallive.server.dynamic.actions.ClusterTableAction;

public class DynDataServiceRegistry
extends ServiceRegistry {
    public static final String RECORD_DISTRIBUTION = "distribution";
    List<ServiceDescription> dynShards = new ArrayList<ServiceDescription>();
    Map<String, DynDataShard> primaryDynShards = new HashMap<String, DynDataShard>();
    DynClusterDistribution activeDistribution;
    boolean autoStartUnderway = false;

    @Override
    public void registerService(ServiceDescription desc) {
        super.registerService(desc);
        if (desc.getActorClazz() == DynDataShard.class) {
            this.dynShards.add(desc);
            if (!this.autoStartUnderway && this.config.isDynAutoStart()) {
                this.autoStartUnderway = true;
                this.delayed(1000L, () -> this.waitForAutoStart());
            }
        }
    }

    public IPromise<DynClusterDistribution> getActiveDistribution() {
        return DynDataServiceRegistry.resolve((Object)this.activeDistribution);
    }

    @Override
    protected void broadCastTimeOut(ServiceDescription desc) {
        this.dynShards.remove(desc);
        this.primaryDynShards.remove(desc.getName());
        super.broadCastTimeOut(desc);
    }

    @Override
    public IPromise releaseDynShard(String shardName2Release) {
        Promise p = new Promise();
        this.collectRecordDistribution().then((Callback & Serializable)(distribution, error) -> {
            if (error != null) {
                p.reject(error);
                return;
            }
            distribution.getTableNames().forEach(tableName -> {
                DynClusterTableDistribution tblDist = distribution.have(tableName);
                List states = tblDist.getStates();
                TableState toRelease = null;
                for (int i = 0; i < states.size(); ++i) {
                    TableState tableState = (TableState)states.get(i);
                    if (!shardName2Release.equals(tableState.getAssociatedShardName())) continue;
                    toRelease = tableState;
                    break;
                }
                int buckets = toRelease.getNumBuckets();
                int bucketsPerNode = buckets / states.size() + 1;
                for (int i = 0; i < states.size(); ++i) {
                    TableState tableState = (TableState)states.get(i);
                    if (shardName2Release.equals(tableState.getAssociatedShardName())) continue;
                    int[] bucketsToRemove = toRelease.takeBuckets(bucketsPerNode);
                    if (bucketsToRemove.length <= 0) break;
                    tblDist.addAction((ClusterTableAction)new MoveHashShardsAction(bucketsToRemove, (String)tableName, toRelease.getAssociatedShardName(), tableState.getAssociatedShardName()));
                    break;
                }
                List collect = distribution.getDistributions().entrySet().stream().map(en -> this.executeActions((DynClusterTableDistribution)en.getValue())).collect(Collectors.toList());
                DynDataServiceRegistry.all(collect).then((Callback & Serializable)(plist, finErr) -> {
                    Log.Info((Object)((Object)this), (String)"*****************************************************************************************************");
                    Log.Info((Object)((Object)this), (String)"table release processed ");
                    if (finErr != null) {
                        Log.Error((Object)((Object)this), (String)("  with ERROR:" + finErr));
                        p.reject(finErr);
                    } else {
                        distribution.clearActions();
                        this.publishDistribution((DynClusterDistribution)distribution);
                        p.resolve();
                    }
                    Log.Info((Object)((Object)this), (String)"*****************************************************************************************************");
                });
            });
        });
        return p;
    }

    @Override
    public IPromise<DynClusterDistribution> getDynDataDistribution() {
        return this.collectRecordDistribution();
    }

    @Override
    public IPromise<DynClusterDistribution> getActiveDynDataDistribution() {
        return DynDataServiceRegistry.resolve((Object)this.activeDistribution);
    }

    @Override
    public IPromise balanceDynShards() {
        Promise p = new Promise();
        this.collectRecordDistribution().then((Callback & Serializable)(distribution, error) -> {
            if (error != null) {
                p.reject(error);
                return;
            }
            try {
                distribution.getTableNames().forEach(tableName -> this.computeDistributionActions(distribution.have(tableName)));
            }
            catch (Exception e) {
                Log.Error((Object)((Object)this), (Throwable)e);
                p.reject((Object)e);
                return;
            }
            List collect = distribution.getDistributions().entrySet().stream().map(en -> this.executeActions((DynClusterTableDistribution)en.getValue())).collect(Collectors.toList());
            DynDataServiceRegistry.all(collect).then((Callback & Serializable)(plist, finErr) -> {
                Log.Info((Object)((Object)this), (String)"*****************************************************************************************************");
                Log.Info((Object)((Object)this), (String)"all table distributions processed ");
                if (finErr != null) {
                    Log.Error((Object)((Object)this), (String)("  with ERROR:" + finErr));
                    p.reject(finErr);
                } else {
                    distribution.clearActions();
                    this.publishDistribution((DynClusterDistribution)distribution);
                    p.resolve();
                }
                Log.Info((Object)((Object)this), (String)"*****************************************************************************************************");
            });
        });
        return p;
    }

    protected void waitForAutoStart() {
        this.collectRecordDistribution().then((Callback & Serializable)(dist, err) -> {
            if (dist != null && dist.hasFullCoverage()) {
                Log.Info((Object)((Object)this), (String)"**** auto start dyn cluster ****");
                this.execute(() -> this.balanceDynShards());
            } else {
                Log.Info((Object)((Object)this), (String)"autostarter waiting for hash coverage ... ");
                this.delayed(2000L, () -> this.waitForAutoStart());
            }
        });
    }

    protected IPromise<DynClusterDistribution> collectRecordDistribution() {
        Promise res = new Promise();
        DynDataServiceRegistry.all(this.dynShards.stream().map(desc -> this.getOrConnect(desc.getName())).collect(Collectors.toList())).then((Callback & Serializable)(listofproms, err) -> {
            List activeShards = listofproms.stream().map(x -> (DynDataShard)((Object)((Object)((Object)x.get())))).collect(Collectors.toList());
            List tableStateProms = activeShards.stream().map(shard -> shard.getStates()).collect(Collectors.toList());
            DynDataServiceRegistry.all(tableStateProms).then((Callback & Serializable)(r, e) -> {
                if (e != null) {
                    res.reject(e);
                }
                List tableStates = tableStateProms.stream().map(x -> (Map)x.get()).collect(Collectors.toList());
                DynClusterDistribution distribution = new DynClusterDistribution();
                tableStates.stream().flatMap(map -> map.entrySet().stream()).forEach(en -> distribution.have((String)en.getKey()).add((TableState)en.getValue()));
                res.resolve((Object)distribution);
            });
        });
        return res;
    }

    private void publishDistribution(DynClusterDistribution distribution) {
        this.activeDistribution = distribution;
        this.broadcastDistribution(distribution);
    }

    protected void broadcastDistribution(DynClusterDistribution mapping) {
        Pair msg = new Pair((Object)RECORD_DISTRIBUTION, (Object)mapping);
        this.listeners = this.listeners.stream().filter(cb -> !cb.isTerminated()).collect(Collectors.toList());
        this.listeners.forEach(cb -> {
            try {
                cb.pipe((Object)msg);
            }
            catch (Throwable th) {
                Log.Info((Object)((Object)this), (Throwable)th);
            }
        });
    }

    private IPromise executeActions(DynClusterTableDistribution tdist) {
        if (tdist.getActions().size() == 0) {
            return DynDataServiceRegistry.resolve();
        }
        Promise p = new Promise();
        ArrayList pendingActions = new ArrayList();
        tdist.getActions().forEach(action -> {
            IPromise<DynDataShard> primaryShard = this.getOrConnect(action.getShardName());
            ServiceDescription other = null;
            if (action.getOtherShard() != null) {
                other = this.getService(action.getOtherShard());
            }
            System.out.println("processing:" + action);
            ServiceDescription finalOther = other;
            primaryShard.then((Callback & Serializable)(shard, error) -> {
                if (error != null) {
                    Log.Error((Object)((Object)this), (String)("" + error));
                } else {
                    pendingActions.add(action.action((Actor)shard, finalOther));
                }
                if (pendingActions.size() == tdist.getActions().size()) {
                    DynDataServiceRegistry.all((List)pendingActions).then((Callback & Serializable)(promlist, paError) -> {
                        if (paError == null) {
                            Log.Info((Object)((Object)this), (String)("actions for table " + tdist.getName() + " done."));
                            p.resolve((Object)true);
                        } else {
                            Log.Error((Object)((Object)this), (String)("actions for table " + tdist.getName() + " FAILED." + paError));
                            p.reject(paError);
                        }
                    });
                }
            });
        });
        return p;
    }

    private void computeDistributionActions(DynClusterTableDistribution distribution) {
        int sanRes = distribution.sanitize();
        Log.Info((Object)((Object)this), (String)("sanitize distribution " + distribution.getName() + " result:" + sanRes));
        switch (sanRes) {
            case 0: {
                break;
            }
            case 4: 
            case 6: {
                this.initFromEmpty(distribution);
                break;
            }
            case 2: {
                Log.Warn((Object)((Object)this), (String)("incomplete distribution detected in " + distribution.getName() + ", either caused by sparse records or a datanode is missing"));
                this.initFromIncomplete(distribution);
                int debug = distribution.sanitize();
                if (debug == 0) break;
                Log.Error((Object)((Object)this), (String)("distribution still invalid after sanitation " + distribution));
                break;
            }
            default: {
                System.out.println(distribution);
                throw new RuntimeException("unhandled cluster distribution state " + distribution.getName() + " " + sanRes);
            }
        }
        int sum = 0;
        List states = distribution.getStates();
        for (int i = 0; i < states.size(); ++i) {
            TableState tableState = (TableState)states.get(i);
            sum += tableState.getMapping().getBitset().cardinality();
        }
        int avg = sum / states.size();
        ArrayList<TableState> receiver = new ArrayList<TableState>();
        ArrayList<TableState> sender = new ArrayList<TableState>();
        for (int i = 0; i < states.size(); ++i) {
            TableState tableState = (TableState)states.get(i);
            int diff = tableState.getNumBuckets() - avg;
            if (diff >= 2) {
                sender.add(tableState);
                continue;
            }
            if (diff > -2) continue;
            receiver.add(tableState);
        }
        sender.sort((a, b) -> a.getNumBuckets() - b.getNumBuckets());
        receiver.sort((b, a) -> a.getNumBuckets() - b.getNumBuckets());
        while (sender.size() > 0 && receiver.size() > 0) {
            TableState sendTS = (TableState)sender.get(0);
            TableState recTS = (TableState)receiver.get(0);
            int diffSender = sendTS.getNumBuckets() - avg;
            int diffReceiver = recTS.getNumBuckets() - avg;
            int transfer = Math.min(-diffReceiver, diffSender);
            int[] transferHashes = sendTS.takeBuckets(transfer);
            if (transferHashes.length > 0) {
                recTS.addBuckets(transferHashes);
                distribution.addAction((ClusterTableAction)new MoveHashShardsAction(transferHashes, sendTS.getTableName(), sendTS.getAssociatedShardName(), recTS.getAssociatedShardName()));
            }
            if (sendTS.getNumBuckets() - avg > avg - recTS.getNumBuckets()) {
                receiver.remove(0);
                continue;
            }
            sender.remove(0);
        }
    }

    void initFromIncomplete(DynClusterTableDistribution distribution) {
        List tableStates = distribution.getStates();
        int numNodes = tableStates.size();
        int tsCount = 0;
        for (int i = 0; i < 32; ++i) {
            TableState tableState = (TableState)tableStates.get(tsCount++);
            if (!distribution.covers(i)) {
                tableState.getMapping().setBucket(i, true);
            }
            if (tsCount < numNodes) continue;
            tsCount = 0;
        }
        distribution.setActions(new ArrayList());
        tableStates.forEach(tstate -> distribution.addAction((ClusterTableAction)new AssignMappingAction(tstate.getTableName(), tstate.getAssociatedShardName(), tstate.getMapping())));
    }

    void initFromEmpty(DynClusterTableDistribution distribution) {
        List tableStates = distribution.getStates();
        int numNodes = tableStates.size();
        int tsCount = 0;
        for (int i = 0; i < 32; ++i) {
            TableState tableState = (TableState)tableStates.get(tsCount++);
            tableState.getMapping().setBucket(i, true);
            if (tsCount < numNodes) continue;
            tsCount = 0;
        }
        distribution.setActions(new ArrayList());
        tableStates.forEach(tstate -> distribution.addAction((ClusterTableAction)new AssignMappingAction(tstate.getTableName(), tstate.getAssociatedShardName(), tstate.getMapping())));
    }

    protected IPromise<DynDataShard> getOrConnect(String name) {
        if (this.primaryDynShards.get(name) != null) {
            return DynDataServiceRegistry.resolve((Object)((Object)this.primaryDynShards.get(name)));
        }
        if (this.primaryDynShards.get(name) == null) {
            for (int i = 0; i < this.dynShards.size(); ++i) {
                ServiceDescription serviceDescription = this.dynShards.get(i);
                if (!serviceDescription.getName().equals(name)) continue;
                Promise p = new Promise();
                serviceDescription.getConnectable().connect((Callback & Serializable)(r, e) -> {}, actor -> Log.Error((Object)((Object)this), (String)("unhandled disconnect " + actor))).then((Callback & Serializable)(r, e) -> {
                    this.primaryDynShards.put(name, (DynDataShard)((Object)r));
                    p.complete(r, e);
                });
                return p;
            }
        }
        return DynDataServiceRegistry.resolve(null);
    }

    public static void main(String[] args) {
        DynDataServiceRegistry.start(args);
    }

    public static ServiceRegistry start(String[] args) {
        options = (RegistryArgs)DynDataServiceRegistry.parseCommandLine(args, null, RegistryArgs.New());
        return DynDataServiceRegistry.start(options);
    }

    public static ServiceRegistry start(RegistryArgs options) {
        return DynDataServiceRegistry.start(options, null, DynDataServiceRegistry.class);
    }

    public static void start(SingleProcessRLClusterArgs options, ClusterCfg cfg) {
        DynDataServiceRegistry.start(options, cfg, DynDataServiceRegistry.class);
    }
}

