/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.reallive.impl.actors;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Spore;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.reallive.api.ChangeMessage;
import org.nustaq.reallive.api.ChangeReceiver;
import org.nustaq.reallive.api.RLFunction;
import org.nustaq.reallive.api.RLPredicate;
import org.nustaq.reallive.api.RealLiveTable;
import org.nustaq.reallive.api.Record;
import org.nustaq.reallive.api.Subscriber;
import org.nustaq.reallive.api.TableDescription;
import org.nustaq.reallive.impl.RLUtil;
import org.nustaq.reallive.impl.storage.StorageStats;
import org.nustaq.reallive.messages.AddMessage;
import org.nustaq.reallive.messages.PutMessage;
import org.nustaq.reallive.messages.RemoveMessage;
import org.nustaq.reallive.records.RecordWrapper;

public class TableSharding
implements RealLiveTable {
    static final int NUM_SLOTS = 10000;
    final ConcurrentHashMap<Subscriber, List<Subscriber>> subsMap = new ConcurrentHashMap();
    private TableDescription description;
    private HashMap<Integer, RealLiveTable[]> shardMap = new HashMap();
    private Set<RealLiveTable> shards = new HashSet<RealLiveTable>();

    public TableSharding(RealLiveTable[] shards, TableDescription desc) {
        this.description = desc;
        for (int i = 0; i < shards.length; ++i) {
            this.addNode(this.createSlots(shards.length, i), shards[i]);
        }
        if (!this.isComplete()) {
            Log.Error((Object)this, (String)"incomplete key coverage");
        }
    }

    private void dumpMisses() {
        for (int i = 0; i < 10000; ++i) {
            if (this.shardMap.containsKey(i)) continue;
            Log.Error((Object)this, (String)("   missing bucket " + i));
        }
    }

    private boolean isComplete() {
        for (int i = 0; i < 10000; ++i) {
            if (this.shardMap.containsKey(i)) continue;
            return false;
        }
        return true;
    }

    private int[] createSlots(int length, int i) {
        int start = 10000 / length * i;
        int end = 10000 / length * (i + 1);
        if (i == length - 1) {
            end = 10000;
        }
        int[] arr = new int[end - start];
        for (int ii = start; ii < end; ++ii) {
            arr[ii - start] = ii;
        }
        return arr;
    }

    public void addNode(int[] slots, RealLiveTable shard) {
        this.shards.add(shard);
        for (int i = 0; i < slots.length; ++i) {
            int slot = slots[i];
            this.addSlot(shard, slot);
        }
    }

    public void removeTableShard(RealLiveTable shard2Remove) {
        this.shardMap.forEach((k, v) -> {
            for (int i = 0; i < ((RealLiveTable[])v).length; ++i) {
                RealLiveTable realLiveTable = v[i];
                if (realLiveTable != shard2Remove) continue;
                RealLiveTable[] newArr = new RealLiveTable[((RealLiveTable[])v).length - 1];
                int oldIdx = 0;
                int newIdx = 0;
                while (newIdx < newArr.length) {
                    if (v[oldIdx] != shard2Remove) {
                        newArr[newIdx] = v[oldIdx];
                        ++newIdx;
                        ++oldIdx;
                        continue;
                    }
                    ++oldIdx;
                }
                break;
            }
        });
        this.shards.remove(shard2Remove);
    }

    private void addSlot(RealLiveTable shard, int slot) {
        RealLiveTable[] mappedShards = this.shardMap.get(slot);
        if (mappedShards == null) {
            mappedShards = new RealLiveTable[]{shard};
            this.shardMap.put(slot, mappedShards);
        } else {
            RealLiveTable[] newArr = new RealLiveTable[mappedShards.length + 1];
            System.arraycopy(mappedShards, 0, newArr, 0, mappedShards.length + 1);
            newArr[newArr.length - 1] = shard;
            this.shardMap.put(slot, newArr);
        }
    }

    protected RealLiveTable[] hashAll(String key) {
        int h = Math.abs(key.hashCode()) % 10000;
        RealLiveTable[] tables = this.shardMap.get(h);
        if (tables == null || tables.length == 0) {
            Log.Warn((Object)this, (String)("cannot map keyHash " + h));
            return null;
        }
        return tables;
    }

    protected RealLiveTable hashAny(String key) {
        int h = Math.abs(key.hashCode()) % 10000;
        RealLiveTable[] tables = this.shardMap.get(h);
        if (tables == null || tables.length == 0) {
            Log.Warn((Object)this, (String)("cannot map keyHash " + h));
            return null;
        }
        return tables[0];
    }

    @Override
    public void receive(ChangeMessage change) {
        if (change.getType() != 3) {
            this.hashAny(change.getKey()).receive(change);
        }
    }

    @Override
    public IPromise resizeIfLoadFactorLarger(double loadFactor, long maxGrowBytes) {
        ArrayList futs = new ArrayList();
        this.shards.forEach(shard -> futs.add(shard.resizeIfLoadFactorLarger(loadFactor, maxGrowBytes)));
        return Actors.all(futs);
    }

    @Override
    public void subscribe(Subscriber subs) {
        AtomicInteger doneCount = new AtomicInteger(this.shards.size());
        ChangeReceiver receiver = subs.getReceiver();
        ArrayList subsList = new ArrayList();
        this.shards.forEach(shard -> {
            Subscriber shardSubs = new Subscriber(subs.getFilter(), change -> {
                if (change.getType() == 3) {
                    int count = doneCount.decrementAndGet();
                    if (count == 0) {
                        receiver.receive(change);
                    }
                } else {
                    receiver.receive(change);
                }
            });
            subsList.add(shardSubs);
            shard.subscribe(shardSubs);
        });
        this.subsMap.put(subs, subsList);
    }

    @Override
    public void unsubscribe(Subscriber subs) {
        if (subs == null) {
            Log.Warn((Object)this, (String)"unsubscribed is null");
            return;
        }
        List<Subscriber> subscribers = this.subsMap.get(subs);
        if (subscribers == null) {
            Log.Warn((Object)this, (String)("unknown subscriber to unsubscribe " + subs));
            return;
        }
        for (int i = 0; i < subscribers.size(); ++i) {
            Subscriber subscriber = subscribers.get(i);
            this.shards.forEach(shard -> shard.unsubscribe(subscriber));
        }
        this.subsMap.remove(subs);
    }

    @Override
    public IPromise atomic(String key, RLFunction<Record, Object> action) {
        return this.hashAny(key).atomic(key, action);
    }

    @Override
    public void atomicUpdate(RLPredicate<Record> filter, RLFunction<Record, Boolean> action) {
        this.shards.forEach(shard -> shard.atomicUpdate(filter, action));
    }

    @Override
    public void put(String key, Object ... keyVals) {
        this.hashAny(key).receive(new PutMessage(RLUtil.get().record(key, keyVals)));
    }

    @Override
    public void merge(String key, Object ... keyVals) {
        this.hashAny(key).receive(RLUtil.get().addOrUpdate(key, keyVals));
    }

    @Override
    public IPromise<Boolean> add(String key, Object ... keyVals) {
        return this.hashAny(key).add(key, keyVals);
    }

    @Override
    public IPromise<Boolean> addRecord(Record rec) {
        if (rec instanceof RecordWrapper) {
            rec = ((RecordWrapper)rec).getRecord();
        }
        return this.hashAny(rec.getKey()).addRecord(rec);
    }

    @Override
    public void mergeRecord(Record rec) {
        if (rec instanceof RecordWrapper) {
            rec = ((RecordWrapper)rec).getRecord();
        }
        this.hashAny(rec.getKey()).receive(new AddMessage(true, rec));
    }

    @Override
    public void setRecord(Record rec) {
        if (rec instanceof RecordWrapper) {
            rec = ((RecordWrapper)rec).getRecord();
        }
        this.hashAny(rec.getKey()).receive(new PutMessage(rec));
    }

    @Override
    public void update(String key, Object ... keyVals) {
        this.hashAny(key).receive(RLUtil.get().update(key, keyVals));
    }

    @Override
    public IPromise<Record> take(String key) {
        return this.hashAny(key).take(key);
    }

    @Override
    public void remove(String key) {
        RemoveMessage remove = RLUtil.get().remove(key);
        this.hashAny(key).receive(remove);
    }

    @Override
    public <T> void forEachWithSpore(Spore<Record, T> spore) {
        spore.setExpectedFinishCount(this.shards.size());
        this.shards.forEach(shard -> shard.forEachWithSpore(spore));
    }

    public IPromise ping() {
        ArrayList futs = new ArrayList();
        return Actors.all(this.shards.stream().map(shard -> shard.ping()).collect(Collectors.toList()));
    }

    @Override
    public IPromise<TableDescription> getDescription() {
        return new Promise((Object)this.description);
    }

    @Override
    public void stop() {
        this.shards.forEach(shard -> shard.stop());
    }

    @Override
    public IPromise<StorageStats> getStats() {
        Promise res = new Promise();
        try {
            Actors.all(this.shards.stream().map(shard -> shard.getStats()).collect(Collectors.toList())).then((Callback & Serializable)(shardStats, err) -> {
                if (shardStats != null) {
                    StorageStats stats = new StorageStats();
                    shardStats.stream().map(fut -> (StorageStats)fut.get()).forEach(nodeStats -> nodeStats.addTo(stats));
                    res.resolve((Object)stats);
                } else {
                    res.reject(err);
                }
            });
        }
        catch (Exception e) {
            Log.Warn((Object)this, (Throwable)e);
            res.reject((Object)e);
        }
        return res;
    }

    @Override
    public IPromise<Record> get(String key) {
        if (key == null) {
            return null;
        }
        return this.hashAny(key).get(key);
    }

    @Override
    public IPromise<Long> size() {
        Promise result = new Promise();
        Actors.all(this.shards.stream().map(shard -> shard.size()).collect(Collectors.toList())).then(longPromisList -> {
            long sum = longPromisList.stream().mapToLong(prom -> (Long)prom.get()).sum();
            result.resolve((Object)sum);
        });
        return result;
    }

    public void removeNode(Actor actorRef) {
        this.shards.stream().filter(tableShard -> {
            Actor removedNode = actorRef.getActorRef();
            Actor shardFacade = ((Actor)tableShard).__clientConnection.getFacadeProxy().getActorRef();
            return shardFacade == removedNode;
        }).collect(Collectors.toList()).forEach(tableShard -> {
            System.out.println("remove " + actorRef + " from " + tableShard);
            this.removeTableShard((RealLiveTable)tableShard);
        });
    }
}

