/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.reallive.client;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Spore;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.reallive.api.ChangeMessage;
import org.nustaq.reallive.api.ChangeReceiver;
import org.nustaq.reallive.api.RLFunction;
import org.nustaq.reallive.api.RLLimitedPredicate;
import org.nustaq.reallive.api.RLNoQueryPredicate;
import org.nustaq.reallive.api.RLPredicate;
import org.nustaq.reallive.api.RealLiveTable;
import org.nustaq.reallive.api.Record;
import org.nustaq.reallive.api.Subscriber;
import org.nustaq.reallive.api.TableDescription;
import org.nustaq.reallive.messages.AddMessage;
import org.nustaq.reallive.messages.PutMessage;
import org.nustaq.reallive.messages.QueryDoneMessage;
import org.nustaq.reallive.messages.RemoveMessage;
import org.nustaq.reallive.records.RecordWrapper;
import org.nustaq.reallive.server.FilterProcessor;
import org.nustaq.reallive.server.FilterSpore;
import org.nustaq.reallive.server.RLUtil;
import org.nustaq.reallive.server.storage.StorageStats;

public class ShardedTable
implements RealLiveTable {
    public static boolean DUMP_IN_PROC_CHANGES = false;
    final ConcurrentHashMap<Subscriber, List<Subscriber>> subsMap = new ConcurrentHashMap();
    protected TableDescription description;
    protected HashMap<Integer, RealLiveTable> tableShardMap = new HashMap();
    protected Set<RealLiveTable> shards = new HashSet<RealLiveTable>();
    protected FilterProcessor proc = new FilterProcessor(this);
    protected AtomicBoolean globalListenReady = new AtomicBoolean(false);

    public ShardedTable(RealLiveTable[] shards, TableDescription desc) {
        this.description = desc;
        for (int i = 0; i < shards.length; ++i) {
            this.addNode(shards[i]);
        }
        long now = System.currentTimeMillis() - 8L;
        this.realSubs(rec -> rec.getLastModified() > now, change -> this.globalListen(change));
    }

    private Subscriber realSubs(RLPredicate<Record> filter, ChangeReceiver receiver) {
        Subscriber subs = new Subscriber(filter, receiver);
        this.realSubscribe(subs);
        return subs;
    }

    private void realSubscribe(Subscriber subs) {
        AtomicInteger doneCount = new AtomicInteger(this.shards.size());
        ChangeReceiver receiver = subs.getReceiver();
        ArrayList subsList = new ArrayList();
        this.shards.forEach(shard -> {
            Subscriber shardSubs = new Subscriber(subs.getFilter(), change -> {
                if (change.getType() == 3) {
                    int count = doneCount.decrementAndGet();
                    if (count == 0) {
                        receiver.receive(change);
                    }
                } else {
                    receiver.receive(change);
                }
            });
            subsList.add(shardSubs);
            shard.subscribe(shardSubs);
        });
        this.subsMap.put(subs, subsList);
    }

    private void globalListen(ChangeMessage change) {
        boolean fin = this.globalListenReady.get();
        if (!fin) {
            if (change.isDoneMsg()) {
                if (DUMP_IN_PROC_CHANGES) {
                    Log.Info((Object)this, (String)"Global Listen Ready");
                }
                this.globalListenReady.set(true);
            }
        } else if (fin) {
            if (DUMP_IN_PROC_CHANGES) {
                Log.Info((Object)this, (String)("Listen Receive:" + change));
            }
            this.proc.receive(change);
        } else {
            Log.Error((Object)this, (String)("Unexpected change routing:" + change));
            boolean bl = true;
        }
    }

    public void addNode(RealLiveTable shard) {
        this.shards.add(shard);
    }

    public void removeTableShard(RealLiveTable shard2Remove) {
        for (Map.Entry<Integer, RealLiveTable> next : this.tableShardMap.entrySet()) {
            if (next.getValue() != shard2Remove) continue;
            this.tableShardMap.remove(next.getKey());
            this.shards.remove(shard2Remove);
            return;
        }
    }

    protected RealLiveTable getTableForKey(String key) {
        int h = Math.abs(key.hashCode()) % this.tableShardMap.size();
        RealLiveTable table = this.tableShardMap.get(h);
        if (table == null) {
            Log.Warn((Object)this, (String)("cannot map keyHash " + h));
            return null;
        }
        return table;
    }

    @Override
    public void receive(ChangeMessage change) {
        if (change.getType() != 3) {
            this.getTableForKey(change.getKey()).receive(change);
        }
    }

    @Override
    public IPromise resizeIfLoadFactorLarger(double loadFactor, long maxGrowBytes) {
        ArrayList futs = new ArrayList();
        this.shards.forEach(shard -> futs.add(shard.resizeIfLoadFactorLarger(loadFactor, maxGrowBytes)));
        return Actors.all(futs);
    }

    @Override
    public void subscribe(Subscriber subs) {
        if (subs.getFilter() instanceof RLNoQueryPredicate) {
            subs.getReceiver().receive(new QueryDoneMessage());
            this.proc.startListening(subs);
        } else {
            this.proc.startListening(subs);
            this.forEach(subs.getFilter(), (Callback<Record>)(Callback & Serializable)(change, err) -> {
                if (Actors.isResult((Object)err)) {
                    subs.getReceiver().receive(new AddMessage(0, (Record)change));
                } else if (Actors.isComplete((Object)err)) {
                    subs.getReceiver().receive(new QueryDoneMessage());
                }
            });
        }
    }

    protected void adjustLimitFilter(RLPredicate filter) {
        if (filter instanceof RLLimitedPredicate) {
            ((RLLimitedPredicate)filter)._setLimit(Math.max(1, ((RLLimitedPredicate)filter).getRecordLimit() / this.shards.size()));
        }
    }

    @Override
    public void unsubscribe(Subscriber subs) {
        this.proc.unsubscribe(subs);
    }

    @Override
    public void unsubscribeById(int subsId) {
        this.proc.unsubscribeById(subsId);
    }

    public void realUnsubscribe(Subscriber subs) {
        if (subs == null) {
            Log.Warn((Object)this, (String)"unsubscribed is null");
            return;
        }
        List<Subscriber> subscribers = this.subsMap.get(subs);
        if (subscribers == null) {
            Log.Warn((Object)this, (String)("unknown subscriber to unsubscribe " + subs));
            return;
        }
        for (int i = 0; i < subscribers.size(); ++i) {
            Subscriber subscriber = subscribers.get(i);
            this.shards.forEach(shard -> shard.unsubscribe(subscriber));
        }
        this.subsMap.remove(subs);
    }

    @Override
    public IPromise atomic(int senderId, String key, RLFunction<Record, Object> action) {
        return this.getTableForKey(key).atomic(senderId, key, action);
    }

    @Override
    public void atomicUpdate(RLPredicate<Record> filter, RLFunction<Record, Boolean> action) {
        this.shards.forEach(shard -> shard.atomicUpdate(filter, action));
    }

    @Override
    public void put(int senderId, String key, Object ... keyVals) {
        this.getTableForKey(key).receive(new PutMessage(senderId, RLUtil.get().record(key, keyVals)));
    }

    @Override
    public void merge(int senderId, String key, Object ... keyVals) {
        this.getTableForKey(key).receive(RLUtil.get().addOrUpdate(senderId, key, keyVals));
    }

    @Override
    public void _deepMerge(int senderId, Record jsonrec) {
        this.getTableForKey(jsonrec.getKey())._deepMerge(senderId, jsonrec);
    }

    @Override
    public IPromise<Boolean> add(int senderId, String key, Object ... keyVals) {
        return this.getTableForKey(key).add(senderId, key, keyVals);
    }

    @Override
    public IPromise<Boolean> addRecord(int senderId, Record rec) {
        if (rec instanceof RecordWrapper) {
            rec = ((RecordWrapper)rec).getRecord();
        }
        return this.getTableForKey(rec.getKey()).addRecord(senderId, rec);
    }

    @Override
    public void mergeRecord(int senderId, Record rec) {
        if (rec instanceof RecordWrapper) {
            rec = ((RecordWrapper)rec).getRecord();
        }
        this.getTableForKey(rec.getKey()).receive(new AddMessage(senderId, true, rec));
    }

    @Override
    public void setRecord(int senderId, Record rec) {
        if (rec instanceof RecordWrapper) {
            rec = ((RecordWrapper)rec).getRecord();
        }
        this.getTableForKey(rec.getKey()).receive(new PutMessage(senderId, rec));
    }

    @Override
    public void update(int senderId, String key, Object ... keyVals) {
        this.getTableForKey(key).receive(RLUtil.get().update(senderId, key, keyVals));
    }

    @Override
    public IPromise<Record> take(int senderId, String key) {
        return this.getTableForKey(key).take(senderId, key);
    }

    @Override
    public void remove(int senderId, String key) {
        RemoveMessage remove = RLUtil.get().remove(senderId, key);
        this.getTableForKey(key).receive(remove);
    }

    @Override
    public <T> void forEachWithSpore(Spore<Record, T> spore) {
        if (spore instanceof FilterSpore) {
            this.adjustLimitFilter(((FilterSpore)spore).getFilter());
        }
        spore.setExpectedFinishCount(this.shards.size());
        this.shards.forEach(shard -> shard.forEachWithSpore(spore));
    }

    public IPromise ping() {
        ArrayList futs = new ArrayList();
        return Actors.all(this.shards.stream().map(shard -> shard.ping()).collect(Collectors.toList()));
    }

    @Override
    public IPromise<TableDescription> getDescription() {
        return new Promise((Object)this.description);
    }

    @Override
    public void stop() {
        this.shards.forEach(shard -> shard.stop());
    }

    @Override
    public IPromise<StorageStats> getStats() {
        Promise res = new Promise();
        try {
            Actors.all(this.shards.stream().map(shard -> shard.getStats()).collect(Collectors.toList())).then((Callback & Serializable)(shardStats, err) -> {
                if (shardStats != null) {
                    StorageStats stats = new StorageStats();
                    shardStats.stream().map(fut -> (StorageStats)fut.get()).forEach(nodeStats -> nodeStats.addTo(stats));
                    res.resolve((Object)stats);
                } else {
                    res.reject(err);
                }
            });
        }
        catch (Exception e) {
            Log.Warn((Object)this, (Throwable)e);
            res.reject((Object)e);
        }
        return res;
    }

    @Override
    public IPromise<Record> get(String key) {
        if (key == null) {
            return null;
        }
        return this.getTableForKey(key).get(key);
    }

    @Override
    public IPromise<Long> size() {
        Promise result = new Promise();
        Actors.all(this.shards.stream().map(shard -> shard.size()).collect(Collectors.toList())).then(longPromisList -> {
            long sum = longPromisList.stream().mapToLong(prom -> (Long)prom.get()).sum();
            result.resolve((Object)sum);
        });
        return result;
    }

    public void removeNode(Actor actorRef) {
        this.shards.stream().filter(tableShard -> {
            Actor removedNode = actorRef.getActorRef();
            Actor shardFacade = ((Actor)tableShard).__clientConnection.getFacadeProxy().getActorRef();
            return shardFacade == removedNode;
        }).collect(Collectors.toList()).forEach(tableShard -> this.removeTableShard((RealLiveTable)tableShard));
    }
}

