/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.impl;

import io.jaq.mpsc.MpscConcurrentQueue;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.Future;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Scheduler;
import org.nustaq.kontraktor.impl.ActorStoppedException;
import org.nustaq.kontraktor.impl.CallEntry;
import org.nustaq.kontraktor.impl.ElasticScheduler;
import org.nustaq.kontraktor.util.Log;

public class DispatcherThread
extends Thread {
    public static int NUMBER_OF_MESSAGES_TO_PROCESS_PER_CHECK_FOR_NEW_ADDS = 500;
    public static int PROFILE_INTERVAL = 255;
    public static int SCHEDULE_PER_PROFILE = 32;
    public static int QUEUE_PERCENTAGE_TRIGGERING_REBALANCE = 80;
    public static int MILLIS_AFTER_CREATION_BEFORE_REBALANCING = 1000;
    public static int TRIGGER_REBALANCE_COUNTER = 2;
    private Scheduler scheduler;
    private Actor[] actors = new Actor[0];
    ConcurrentLinkedQueue<Actor> toAdd = new ConcurrentLinkedQueue();
    protected boolean shutDown = false;
    private int maxThreads;
    static AtomicInteger dtcount = new AtomicInteger(0);
    int stackDepth = 0;
    int count = 0;
    int profileCounter = 0;
    int schedCounter = 0;
    int loadCounter = 0;
    int nextProfile = 511;
    long created = System.currentTimeMillis();

    public DispatcherThread(Scheduler scheduler) {
        this.scheduler = scheduler;
        this.maxThreads = scheduler.getMaxThreads();
        this.setName("DispatcherThread " + dtcount.incrementAndGet());
    }

    @Override
    public String toString() {
        return "DispatcherThread{ name:" + this.getName() + '}';
    }

    public void addActor(Actor act) {
        this.toAdd.offer(act.getActorRef());
    }

    void removeActorImmediate(Actor act) {
        if (Thread.currentThread() != this) {
            throw new RuntimeException("wrong thread");
        }
        Actor[] newAct = new Actor[this.actors.length - 1];
        int idx = 0;
        for (int i = 0; i < this.actors.length; ++i) {
            Actor actor = this.actors[i];
            if (actor == act) continue;
            newAct[idx++] = actor;
        }
        if (idx != newAct.length) {
            throw new RuntimeException("could not remove actor");
        }
        this.actors = newAct;
    }

    @Override
    public void run() {
        int emptyCount = 0;
        int scheduleNewActorCount = 0;
        boolean isShutDown = false;
        while (!isShutDown) {
            if (this.pollQs(null)) {
                emptyCount = 0;
                if (++scheduleNewActorCount <= NUMBER_OF_MESSAGES_TO_PROCESS_PER_CHECK_FOR_NEW_ADDS) continue;
                scheduleNewActorCount = 0;
                this.schedulePendingAdds();
                continue;
            }
            this.scheduler.yield(++emptyCount);
            if (this.shutDown) {
                isShutDown = true;
            }
            if (!this.scheduler.getBackoffStrategy().isSleeping(emptyCount)) continue;
            scheduleNewActorCount = 0;
            this.schedulePendingAdds();
            if (System.currentTimeMillis() - this.created <= 3000L) continue;
            if (this.actors.length == 0 && this.toAdd.peek() == null) {
                this.shutDown();
                continue;
            }
            this.scheduler.tryStopThread(this);
        }
        this.scheduler.threadStopped(this);
        for (int i = 0; i < 100; ++i) {
            LockSupport.parkNanos(5000000L);
            if (this.actors.length <= 0) continue;
            if (ElasticScheduler.DEBUG_SCHEDULING) {
                Log.Lg.warn(this, "Severe: zombie dispatcher thread detected");
            }
            this.scheduler.tryStopThread(this);
            i = 0;
        }
        if (ElasticScheduler.DEBUG_SCHEDULING) {
            Log.Info(this, "dipatcher thread terminated");
        }
    }

    private void schedulePendingAdds() {
        Actor a;
        ArrayList<Actor> newOnes = new ArrayList<Actor>();
        while ((a = this.toAdd.poll()) != null) {
            newOnes.add(a);
        }
        if (newOnes.size() > 0) {
            Actor[] newQueue = new Actor[newOnes.size() + this.actors.length];
            System.arraycopy(this.actors, 0, newQueue, 0, this.actors.length);
            for (int i = 0; i < newOnes.size(); ++i) {
                Actor actor;
                newQueue[this.actors.length + i] = actor = (Actor)newOnes.get(i);
            }
            this.actors = newQueue;
        }
    }

    protected CallEntry pollQueues(Actor[] actors, Actor refToExclude) {
        Actor actor2poll;
        if (this.count >= actors.length) {
            this.count = 0;
            if (actors.length == 0) {
                return null;
            }
        }
        if ((actor2poll = actors[this.count]) == refToExclude) {
            if (actors.length > 1) {
                ++this.count;
                this.pollQueues(actors, refToExclude);
            }
            return null;
        }
        CallEntry res = (CallEntry)actor2poll.__cbQueue.poll();
        if (res == null) {
            res = (CallEntry)actor2poll.__mailbox.poll();
        }
        ++this.count;
        return res;
    }

    public boolean pollQs(Actor refToExclude) {
        CallEntry callEntry = this.pollQueues(this.actors, refToExclude);
        if (callEntry != null) {
            try {
                Actor.sender.set(callEntry.getTargetActor());
                Object invoke = null;
                if (this.maxThreads > 1) {
                    ++this.profileCounter;
                    if (this.profileCounter > this.nextProfile && callEntry.getTarget() instanceof Actor) {
                        this.profileCounter = 0;
                        invoke = this.profiledCall(callEntry);
                    } else {
                        invoke = this.invoke(callEntry);
                    }
                } else {
                    invoke = this.invoke(callEntry);
                }
                if (callEntry.getFutureCB() != null) {
                    final Future futureCB = callEntry.getFutureCB();
                    Promise invokeResult = (Promise)invoke;
                    invokeResult.then(new Callback(){

                        public void receive(Object result, Object error) {
                            futureCB.receive(result, error);
                        }
                    });
                }
                return true;
            }
            catch (Exception e) {
                if (e instanceof InvocationTargetException && ((InvocationTargetException)e).getTargetException() == ActorStoppedException.Instance) {
                    Actor actor = (Actor)callEntry.getTarget();
                    actor.__stopped = true;
                    this.removeActorImmediate(actor.getActorRef());
                    return true;
                }
                if (callEntry.getFutureCB() != null) {
                    callEntry.getFutureCB().receive(null, e);
                }
                Log.Warn(this, e, "");
            }
        }
        return false;
    }

    private Object invoke(CallEntry poll) throws IllegalAccessException, InvocationTargetException {
        Object target = poll.getTarget();
        return poll.getMethod().invoke(target, poll.getArgs());
    }

    private Object profiledCall(CallEntry poll) throws IllegalAccessException, InvocationTargetException {
        this.nextProfile = (int)((double)PROFILE_INTERVAL + Math.random() * 13.0);
        ++this.schedCounter;
        long nanos = System.nanoTime();
        Object invoke = this.invoke(poll);
        nanos = System.nanoTime() - nanos;
        ((Actor)poll.getTarget()).__nanos = (((Actor)poll.getTarget()).__nanos * 31L + nanos) / 32L;
        if (this.schedCounter > SCHEDULE_PER_PROFILE) {
            this.schedCounter = 0;
            this.checkForSplit();
        }
        return invoke;
    }

    private void checkForSplit() {
        int load = this.getLoad();
        if (load > QUEUE_PERCENTAGE_TRIGGERING_REBALANCE && this.actors.length > 1 && System.currentTimeMillis() - this.created > (long)MILLIS_AFTER_CREATION_BEFORE_REBALANCING) {
            ++this.loadCounter;
            if (this.loadCounter > TRIGGER_REBALANCE_COUNTER) {
                this.loadCounter = 0;
                this.scheduler.rebalance(this);
            }
        }
    }

    void splitTo(DispatcherThread newOne) {
        if (ElasticScheduler.DEBUG_SCHEDULING) {
            Log.Info(this, "SPLIT " + this.scheduler.getMaxThreads());
        }
        long myTime = 0L;
        long otherTime = 0L;
        Arrays.sort(this.actors, new Comparator(){

            public int compare(Object o1, Object o2) {
                return ((Actor)o1).__nanos - ((Actor)o2).__nanos > 0L ? -1 : 1;
            }
        });
        for (int i = 0; i < this.actors.length; ++i) {
            Actor act = this.actors[i];
            long nan = act.__nanos;
            if (otherTime < myTime) {
                otherTime += nan;
                continue;
            }
            myTime += nan;
        }
        otherTime = 0L;
        myTime = 0L;
        ArrayList<Actor> new2ScheduleOnMe = new ArrayList<Actor>();
        ArrayList<Actor> new2ScheduleOnOther = new ArrayList<Actor>();
        for (int i = 0; i < this.actors.length; ++i) {
            Actor act = this.actors[i];
            long nan = act.__nanos;
            if (otherTime < myTime) {
                new2ScheduleOnOther.add(act);
                otherTime += nan;
                act.__currentDispatcher = newOne;
                continue;
            }
            new2ScheduleOnMe.add(act);
            myTime += nan;
        }
        this.actors = new Actor[new2ScheduleOnMe.size()];
        new2ScheduleOnMe.toArray(this.actors);
        newOne.actors = new Actor[new2ScheduleOnOther.size()];
        new2ScheduleOnOther.toArray(newOne.actors);
        if (ElasticScheduler.DEBUG_SCHEDULING) {
            Log.Info(this, "split distribution " + myTime + ":" + otherTime + " actors " + this.actors.length);
        }
        this.created = System.currentTimeMillis();
    }

    public int getLoad() {
        int res = 0;
        Actor[] actors = this.actors;
        for (int i = 0; i < actors.length; ++i) {
            MpscConcurrentQueue queue = (MpscConcurrentQueue)actors[i].__mailbox;
            int load = queue.size() * 100 / queue.getCapacity();
            if (load <= res) continue;
            res = load;
        }
        return res;
    }

    public long getLoadNanos() {
        long res = 0L;
        Actor[] actors = this.actors;
        for (int i = 0; i < actors.length; ++i) {
            Actor a = actors[i];
            res += a.__nanos;
        }
        return res;
    }

    public int getQSize() {
        int res = 0;
        Actor[] actors = this.actors;
        for (int i = 0; i < actors.length; ++i) {
            Actor a = actors[i];
            res += a.__mailbox.size();
            res += a.__cbQueue.size();
        }
        return res;
    }

    public boolean isShutDown() {
        return !this.shutDown;
    }

    public void shutDown() {
        this.shutDown = true;
    }

    public void shutDownImmediate() {
        throw new RuntimeException("unimplemented");
    }

    public boolean isEmpty() {
        for (int i = 0; i < this.actors.length; ++i) {
            Actor act = this.actors[i];
            if (act.__mailbox.isEmpty() && act.__cbQueue.isEmpty()) continue;
            return false;
        }
        return true;
    }

    public void waitEmpty(long nanos) {
        while (!this.isEmpty()) {
            LockSupport.parkNanos(nanos);
        }
    }

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    public Actor[] getActors() {
        Actor[] actors = this.actors;
        Actor[] res = new Actor[actors.length];
        System.arraycopy(actors, 0, res, 0, res.length);
        return res;
    }

    Actor[] getActorsNoCopy() {
        return this.actors;
    }
}

