/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.services.datacluster.dynamic;

import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.remoting.KConnectionPool;
import org.nustaq.kontraktor.remoting.base.ConnectableActor;
import org.nustaq.kontraktor.remoting.base.ServiceDescription;
import org.nustaq.kontraktor.remoting.tcp.TCPConnectable;
import org.nustaq.kontraktor.services.ClusterCfg;
import org.nustaq.kontraktor.services.ServiceActor;
import org.nustaq.kontraktor.services.ServiceArgs;
import org.nustaq.kontraktor.services.ServiceRegistry;
import org.nustaq.kontraktor.services.datacluster.DataCfg;
import org.nustaq.kontraktor.services.datacluster.DataShardArgs;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.reallive.api.RLPredicate;
import org.nustaq.reallive.api.Record;
import org.nustaq.reallive.api.TableState;
import org.nustaq.reallive.server.actors.DynTableSpaceActor;
import org.nustaq.reallive.server.actors.RealLiveTableActor;
import org.nustaq.reallive.server.storage.ClusterTableRecordMapping;

public class DynDataShard
extends ServiceActor<DynDataShard> {
    public static final String DATA_SHARD_NAME = "DynShard";
    public static int WAIT_TABLE_LOAD = 60000;
    DynTableSpaceActor tableSpace;
    KConnectionPool shardConnectionPool;

    @Override
    public IPromise init(ConnectableActor registryConnectable, ServiceArgs options, boolean auto) {
        Promise p = new Promise();
        this.shardConnectionPool = new KConnectionPool();
        try {
            super.init(registryConnectable, options, false).then((arg_0, arg_1) -> this.lambda$init$2252ec85$1((IPromise)p, arg_0, arg_1));
        }
        catch (Throwable t) {
            p.reject((Object)t);
        }
        return p;
    }

    @Override
    protected int getPort() {
        if (this.getCmdline().getDsPortOverride() > 0) {
            return this.getCmdline().getDsPortOverride();
        }
        return this.cmdline.getDataShardPortBase() + this.getCmdline().getShardNo();
    }

    private int getShardNo() {
        return this.getCmdline().getShardNo();
    }

    protected void initTableSpace() {
        if (this.config == null) {
            Log.Error((Object)((Object)this), (String)"no cluster config received or failed to connect Service Registry");
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.exit(1);
        }
        DataCfg dataCfg = this.config.getDataCluster();
        this.tableSpace = (DynTableSpaceActor)Actors.AsActor(DynTableSpaceActor.class);
        String dir = dataCfg.getDataDir()[0];
        new File(dir).mkdirs();
        this.tableSpace.setBaseDataDir(dir);
        this.tableSpace.init();
        Log.Info((Object)((Object)this), (String)"start loading tables");
        Arrays.stream(this.config.getDataCluster().getSchema()).forEach(td -> {
            try {
                td.shardId(this.getServiceDescription().getName());
                td.shardNo(this.getCmdline().getShardNo());
                this.tableSpace.createOrLoadTable(td).await((long)WAIT_TABLE_LOAD);
                Log.Info((Object)((Object)this), (String)("loaded table " + td.getName()));
            }
            catch (Exception e) {
                Log.Error((Object)((Object)this), (Throwable)e, (String)"failed to initialize table");
            }
        });
        Log.Info((Object)((Object)this), (String)"wait table space ping");
        this.tableSpace.ping().await();
        Log.Info((Object)((Object)this), (String)("finished init tablespace in " + dataCfg.getDataDir()[0] + " dynshard " + this.getShardNo()));
    }

    public IPromise<DynTableSpaceActor> getTableSpace() {
        return DynDataShard.resolve((Object)this.tableSpace);
    }

    @Override
    protected boolean isFixedDataCluster() {
        return false;
    }

    @Override
    protected String[] getRequiredServiceNames() {
        return new String[0];
    }

    @Override
    protected ServiceDescription createServiceDescription() {
        return new ServiceDescription(DATA_SHARD_NAME + this.getShardNo()).connectable((ConnectableActor)new TCPConnectable(DynDataShard.class, this.cmdline.getHost(), this.cmdline.getDataShardPortBase() + this.getShardNo()));
    }

    @Override
    protected DataShardArgs getCmdline() {
        return (DataShardArgs)this.cmdline;
    }

    @Override
    protected Serializable getStatus() {
        return "{ \"connections\":" + (this.__connections != null ? this.__connections.size() : 0) + "}";
    }

    public IPromise<Map<String, TableState>> getStates() {
        Promise p = new Promise();
        this.tableSpace.getStates().then((Callback & Serializable)(r, e) -> {
            r.values().stream().forEach(ts -> ts.associatedShardName(this.serviceDescription.getName()));
            p.complete(r, e);
        });
        return p;
    }

    public IPromise _setMapping(String tableName, ClusterTableRecordMapping mapping) {
        return this.tableSpace._setMapping(tableName, mapping);
    }

    public static void main(String[] args) {
        DynDataShard.start(args);
    }

    public static DynDataShard start(String[] args) {
        DataShardArgs options = (DataShardArgs)ServiceRegistry.parseCommandLine(args, null, DataShardArgs.New());
        return DynDataShard.start(options);
    }

    public static DynDataShard start(DataShardArgs options) {
        DynDataShard ds = (DynDataShard)Actors.AsActor(DynDataShard.class, (int)256000);
        TCPConnectable registryConnectable = new TCPConnectable(ServiceRegistry.class, options.getRegistryHost(), options.getRegistryPort());
        Log.Info(DynDataShard.class, (String)("connect registry at " + registryConnectable));
        ds.init((ConnectableActor)registryConnectable, options, true).await();
        Log.Info(((Object)((Object)ds)).getClass(), (String)"Init finished");
        return ds;
    }

    public IPromise _moveHashShardsTo(String tableName, int[] hashShards2Move, ServiceDescription otherRef) {
        Promise res = new Promise();
        this.serialOn("RecordRedistribution#" + tableName, prom -> this.shardConnectionPool.getConnection(otherRef.getConnectable()).then((Callback & Serializable)(remote, err) -> {
            if (remote != null) {
                ClusterTableRecordMapping movedMapping = new ClusterTableRecordMapping();
                movedMapping.addBuckets(hashShards2Move);
                try {
                    RealLiveTableActor table = (RealLiveTableActor)this.tableSpace.getTableAsync(tableName).await();
                    ArrayList toTransmit = new ArrayList();
                    table.forEach((RLPredicate & Serializable)record -> movedMapping.matches(record.getKey().hashCode()), (Callback & Serializable)(r, e) -> {
                        if (r != null) {
                            toTransmit.add(r);
                        } else {
                            Log.Info((Object)((Object)this), (String)("transmitting " + tableName + " " + toTransmit.size() + " records to " + otherRef.getName()));
                            ((DynDataShard)((Object)((Object)((Object)((Object)remote)))))._receiveHashTransmission(tableName, hashShards2Move, toTransmit).then((Callback & Serializable)(rr, ee) -> {
                                if (ee == null) {
                                    ClusterTableRecordMapping mapping = this.tableSpace.getMapping(tableName);
                                    ClusterTableRecordMapping newMapping = ClusterTableRecordMapping.Copy((ClusterTableRecordMapping)mapping);
                                    newMapping.remove(hashShards2Move);
                                    toTransmit.forEach(rec -> table._removeSilent(rec.getKey()));
                                    this._setMapping(tableName, newMapping).then((Callback & Serializable)(rrr, eee) -> {
                                        Log.Info((Object)((Object)this), (String)("COMPLETE: transmitting " + tableName + " " + toTransmit.size() + " records to " + otherRef.getName()));
                                        prom.complete();
                                    });
                                    res.resolve();
                                } else {
                                    res.reject(ee);
                                    prom.complete();
                                }
                            });
                        }
                    });
                }
                catch (Exception e2) {
                    res.reject((Object)e2);
                    prom.complete();
                }
            } else {
                prom.complete();
                res.reject(err);
            }
        }));
        return res;
    }

    public IPromise _receiveHashTransmission(String tableName, int[] hashShards2Move, List<Record> toTransmit) {
        Promise p = new Promise();
        this.serialOn("RecordRedistribution#" + tableName, prom -> {
            Log.Info((Object)((Object)this), (String)("received transmision of " + toTransmit.size() + " records to " + tableName));
            RealLiveTableActor table = (RealLiveTableActor)this.tableSpace.getTableAsync(tableName).await();
            ClusterTableRecordMapping mapping = this.tableSpace.getMapping(tableName);
            ClusterTableRecordMapping newMapping = ClusterTableRecordMapping.Copy((ClusterTableRecordMapping)mapping);
            newMapping.addBuckets(hashShards2Move);
            toTransmit.forEach(rec -> table._addSilent(rec));
            this._setMapping(tableName, newMapping).then((Callback & Serializable)(r, e) -> prom.resolve());
            p.complete();
        });
        return p;
    }

    private /* synthetic */ void lambda$init$2252ec85$1(IPromise p, Object r, Object e) {
        try {
            this.config = (ClusterCfg)((ServiceRegistry)this.serviceRegistry.get()).getConfig().await();
            if (!this.config.getDataCluster().isDynamic()) {
                Log.Error((Object)((Object)this), (String)"dynamic nodes cannot run with nondynamic cluster configurations. set datacfg.isDynamic to true");
                Thread.sleep(1000L);
                System.exit(1);
            }
            this.initTableSpace();
            this.registerSelf();
            p.resolve();
        }
        catch (Exception ex) {
            Log.Error((Object)((Object)this), (Throwable)ex);
        }
    }
}

