/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.reallive.impl.actors;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Spore;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.reallive.impl.RLUtil;
import org.nustaq.reallive.impl.actors.ShardFunc;
import org.nustaq.reallive.impl.storage.StorageStats;
import org.nustaq.reallive.interfaces.ChangeMessage;
import org.nustaq.reallive.interfaces.ChangeReceiver;
import org.nustaq.reallive.interfaces.Mutation;
import org.nustaq.reallive.interfaces.RLConsumer;
import org.nustaq.reallive.interfaces.RLPredicate;
import org.nustaq.reallive.interfaces.RealLiveTable;
import org.nustaq.reallive.interfaces.Record;
import org.nustaq.reallive.interfaces.Subscriber;
import org.nustaq.reallive.interfaces.TableDescription;
import org.nustaq.reallive.messages.AddMessage;
import org.nustaq.reallive.messages.PutMessage;
import org.nustaq.reallive.messages.RemoveMessage;
import org.nustaq.reallive.records.RecordWrapper;

public class TableSharding<K>
implements RealLiveTable<K> {
    ShardFunc<K> func;
    RealLiveTable<K>[] shards;
    final ConcurrentHashMap<Subscriber, List<Subscriber>> subsMap = new ConcurrentHashMap();
    private TableDescription description;

    public TableSharding(ShardFunc<K> func, RealLiveTable<K>[] shards, TableDescription desc) {
        this.func = func;
        this.shards = shards;
        this.description = desc;
    }

    @Override
    public void receive(ChangeMessage<K> change) {
        if (change.getType() != 3) {
            this.shards[this.func.apply(change.getKey())].receive(change);
        }
    }

    @Override
    public void subscribe(Subscriber<K> subs) {
        AtomicInteger doneCount = new AtomicInteger(this.shards.length);
        ChangeReceiver receiver = subs.getReceiver();
        ArrayList<Subscriber<K>> subsList = new ArrayList<Subscriber<K>>();
        for (int i = 0; i < this.shards.length; ++i) {
            RealLiveTable<K> shard = this.shards[i];
            Subscriber<K> shardSubs = new Subscriber<K>(subs.getPrePatchFilter(), subs.getFilter(), change -> {
                if (change.getType() == 3) {
                    int count = doneCount.decrementAndGet();
                    if (count == 0) {
                        receiver.receive(change);
                    }
                } else {
                    receiver.receive(change);
                }
            });
            subsList.add(shardSubs);
            shard.subscribe(shardSubs);
        }
        this.subsMap.put(subs, subsList);
    }

    @Override
    public void unsubscribe(Subscriber<K> subs) {
        if (subs == null) {
            Log.Warn((Object)this, (String)"unsubscribed is null");
            return;
        }
        List<Subscriber> subscribers = this.subsMap.get(subs);
        if (subscribers == null) {
            Log.Warn((Object)this, (String)("unknown subscriber to unsubscribe " + subs));
            return;
        }
        for (int i = 0; i < subscribers.size(); ++i) {
            Subscriber subscriber = subscribers.get(i);
            this.shards[i].unsubscribe(subscriber);
        }
        this.subsMap.remove(subs);
    }

    @Override
    public IPromise<Boolean> putCAS(RLPredicate<Record<K>> casCondition, K key, Object ... keyVals) {
        return this.shards[this.func.apply(key)].getMutation().putCAS(casCondition, key, keyVals);
    }

    @Override
    public void atomic(K key, RLConsumer<Record<K>> action) {
        this.shards[this.func.apply(key)].getMutation().atomic(key, action);
    }

    @Override
    public Mutation<K> getMutation() {
        return new ShardMutation();
    }

    @Override
    public <T> void forEach(Spore<Record<K>, T> spore) {
        spore.setExpectedFinishCount(this.shards.length);
        for (int i = 0; i < this.shards.length; ++i) {
            RealLiveTable<K> shard = this.shards[i];
            shard.forEach(spore);
        }
    }

    @Override
    public IPromise ping() {
        ArrayList<IPromise> futs = new ArrayList<IPromise>();
        for (int i = 0; i < this.shards.length; ++i) {
            RealLiveTable<K> shard = this.shards[i];
            futs.add(shard.ping());
        }
        return Actors.all(futs);
    }

    @Override
    public IPromise<TableDescription> getDescription() {
        return new Promise((Object)this.description);
    }

    @Override
    public void stop() {
        for (int i = 0; i < this.shards.length; ++i) {
            this.shards[i].stop();
        }
    }

    @Override
    public IPromise<StorageStats> getStats() {
        IPromise[] shardStats = (IPromise[])Actors.all((int)this.shards.length, i -> this.shards[i].getStats()).await();
        StorageStats stats = new StorageStats();
        for (int i2 = 0; i2 < shardStats.length; ++i2) {
            StorageStats storageStats = (StorageStats)shardStats[i2].get();
            storageStats.addTo(stats);
        }
        return new Promise((Object)stats);
    }

    @Override
    public IPromise<Record<K>> get(K key) {
        if (key == null) {
            return null;
        }
        return this.shards[this.func.apply(key)].get(key);
    }

    @Override
    public IPromise<Long> size() {
        Promise result = new Promise();
        ArrayList<IPromise<Long>> futs = new ArrayList<IPromise<Long>>();
        for (int i = 0; i < this.shards.length; ++i) {
            RealLiveTable<K> shard = this.shards[i];
            futs.add(shard.size());
        }
        Actors.all(futs).then(longPromisList -> {
            long sum = longPromisList.stream().mapToLong(prom -> (Long)prom.get()).sum();
            result.resolve((Object)sum);
        });
        return result;
    }

    protected class ShardMutation
    implements Mutation<K> {
        protected ShardMutation() {
        }

        @Override
        public IPromise<Boolean> putCAS(RLPredicate<Record<K>> casCondition, K key, Object ... keyVals) {
            return TableSharding.this.shards[TableSharding.this.func.apply(key)].getMutation().putCAS(casCondition, key, keyVals);
        }

        @Override
        public void put(K key, Object ... keyVals) {
            TableSharding.this.shards[TableSharding.this.func.apply(key)].receive(new PutMessage(RLUtil.get().record(key, keyVals)));
        }

        @Override
        public void atomic(K key, RLConsumer action) {
            TableSharding.this.shards[TableSharding.this.func.apply(key)].getMutation().atomic(key, action);
        }

        @Override
        public void addOrUpdate(K key, Object ... keyVals) {
            TableSharding.this.shards[TableSharding.this.func.apply(key)].receive(RLUtil.get().addOrUpdate(key, keyVals));
        }

        @Override
        public void add(K key, Object ... keyVals) {
            TableSharding.this.shards[TableSharding.this.func.apply(key)].receive(RLUtil.get().add(key, keyVals));
        }

        @Override
        public void add(Record<K> rec) {
            if (rec instanceof RecordWrapper) {
                rec = ((RecordWrapper)rec).getRecord();
            }
            TableSharding.this.shards[TableSharding.this.func.apply(rec.getKey())].receive(new AddMessage(rec));
        }

        @Override
        public void addOrUpdateRec(Record<K> rec) {
            if (rec instanceof RecordWrapper) {
                rec = ((RecordWrapper)rec).getRecord();
            }
            TableSharding.this.shards[TableSharding.this.func.apply(rec.getKey())].receive(new AddMessage(true, rec));
        }

        @Override
        public void put(Record<K> rec) {
            if (rec instanceof RecordWrapper) {
                rec = ((RecordWrapper)rec).getRecord();
            }
            TableSharding.this.shards[TableSharding.this.func.apply(rec.getKey())].receive(new PutMessage(rec));
        }

        @Override
        public void update(K key, Object ... keyVals) {
            TableSharding.this.shards[TableSharding.this.func.apply(key)].receive(RLUtil.get().update(key, keyVals));
        }

        @Override
        public void remove(K key) {
            RemoveMessage remove = RLUtil.get().remove(key);
            TableSharding.this.shards[TableSharding.this.func.apply(key)].receive(remove);
        }
    }
}

