/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.impl;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.Future;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Scheduler;
import org.nustaq.kontraktor.impl.BackOffStrategy;
import org.nustaq.kontraktor.impl.CallEntry;
import org.nustaq.kontraktor.impl.CallbackWrapper;
import org.nustaq.kontraktor.impl.DispatcherThread;
import org.nustaq.kontraktor.util.Log;

public class ElasticScheduler
implements Scheduler {
    public static int DEFQSIZE = 16384;
    public static boolean DEBUG_SCHEDULING = true;
    public static int BLOCK_COUNT_WARNING_THRESHOLD = 10000;
    public static int RECURSE_ON_BLOCK_THRESHOLD = 1023;
    int maxThread = Runtime.getRuntime().availableProcessors();
    protected BackOffStrategy backOffStrategy = new BackOffStrategy();
    volatile DispatcherThread[] threads;
    int defQSize = DEFQSIZE;
    protected ExecutorService exec = Executors.newCachedThreadPool();
    protected static Timer delayedCalls = new Timer();
    Object balanceLock = new Object();

    public ElasticScheduler(int maxThreads) {
        this(maxThreads, DEFQSIZE);
    }

    public ElasticScheduler(int maxThreads, int defQSize) {
        this.maxThread = maxThreads;
        this.defQSize = defQSize;
        if (defQSize <= 1) {
            this.defQSize = DEFQSIZE;
        }
        this.threads = new DispatcherThread[maxThreads];
    }

    public int getActiveThreads() {
        int res = 0;
        for (int i = 0; i < this.threads.length; ++i) {
            if (this.threads[i] == null) continue;
            ++res;
        }
        return res;
    }

    @Override
    public int getMaxThreads() {
        return this.maxThread;
    }

    @Override
    public int getDefaultQSize() {
        return this.defQSize;
    }

    public Future put2QueuePolling(CallEntry e) {
        Promise fut;
        if (e.hasFutureResult()) {
            fut = new Promise();
            e.setFutureCB(new CallbackWrapper(e.getSendingActor(), new Callback(){

                public void receive(Object result, Object error) {
                    fut.receive(result, error);
                }
            }));
        } else {
            fut = null;
        }
        Actor targetActor = e.getTargetActor();
        this.put2QueuePolling(targetActor.__mailbox, e, targetActor);
        return fut;
    }

    @Override
    public void yield(int count) {
        this.backOffStrategy.yield(count);
    }

    @Override
    public void put2QueuePolling(Queue q, Object o, Object receiver) {
        int count = 0;
        boolean warningPrinted = false;
        while (!q.offer(o)) {
            this.yield(count++);
            if (count <= RECURSE_ON_BLOCK_THRESHOLD || Thread.currentThread() instanceof DispatcherThread) {
                // empty if block
            }
            if (count <= BLOCK_COUNT_WARNING_THRESHOLD || warningPrinted) continue;
            warningPrinted = true;
            String receiverString = receiver instanceof Actor ? (q == ((Actor)receiver).__cbQueue ? receiver.getClass().getSimpleName() + " callbackQ" : (q == ((Actor)receiver).__mailbox ? receiver.getClass().getSimpleName() + " mailbox" : receiver.getClass().getSimpleName() + " unknown queue")) : "" + receiver;
            String sender = "";
            Actor sendingActor = Actor.sender.get();
            if (sendingActor != null) {
                sender = ", sender:" + sendingActor.getActor().getClass().getSimpleName();
            }
            Log.Lg.warn(this, "Warning: Thread " + Thread.currentThread().getName() + " blocked trying to put message on " + receiverString + sender);
        }
    }

    @Override
    public Object enqueueCall(Actor sendingActor, Actor receiver, String methodName, Object[] args) {
        Object actor = receiver.getActor();
        Method method = ((Actor)actor).__getCachedMethod(methodName, (Actor)actor);
        boolean count = false;
        for (int i = 0; i < args.length; ++i) {
            Object arg = args[i];
            if (!(arg instanceof Callback)) continue;
            args[i] = new CallbackWrapper(sendingActor, (Callback)arg);
        }
        CallEntry e = new CallEntry(actor, method, args, Actor.sender.get(), (Actor)actor);
        return this.put2QueuePolling(e);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void threadStopped(DispatcherThread th) {
        DispatcherThread[] dispatcherThreadArray = this.threads;
        synchronized (this.threads) {
            for (int i = 0; i < this.threads.length; ++i) {
                if (this.threads[i] != th) continue;
                this.threads[i] = null;
                // ** MonitorExit[var2_2] (shouldn't be in output)
                return;
            }
            // ** MonitorExit[var2_2] (shouldn't be in output)
            throw new RuntimeException("Oops. Unknown Thread");
        }
    }

    @Override
    public InvocationHandler getInvoker(Actor dispatcher, Object toWrap) {
        return new CallbackInvokeHandler(toWrap, dispatcher);
    }

    @Override
    public <T> T inThread(Actor actor, T callback) {
        Class<?>[] interfaces = callback.getClass().getInterfaces();
        InvocationHandler invoker = actor.__scheduler.getInvoker(actor, callback);
        if (invoker == null) {
            return callback;
        }
        return (T)Proxy.newProxyInstance(callback.getClass().getClassLoader(), interfaces, invoker);
    }

    @Override
    public void delayedCall(long millis, final Runnable toRun) {
        delayedCalls.schedule(new TimerTask(){

            @Override
            public void run() {
                toRun.run();
            }
        }, millis);
    }

    @Override
    public <T> void runBlockingCall(Actor emitter, Callable<T> toCall, Callback<T> resultHandler) {
        CallbackWrapper resultWrapper = new CallbackWrapper(emitter, resultHandler);
        this.exec.execute(() -> {
            try {
                resultWrapper.receive(toCall.call(), null);
            }
            catch (Throwable th) {
                resultWrapper.receive(null, th);
            }
        });
    }

    @Override
    public Future<Future[]> yield(Future ... futures) {
        return Actors.yield(futures);
    }

    @Override
    public <T> Future<List<Future<T>>> yield(List<Future<T>> futures) {
        return Actors.yield(futures);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DispatcherThread assignDispatcher() {
        DispatcherThread[] dispatcherThreadArray = this.threads;
        synchronized (this.threads) {
            int minLoad = Integer.MAX_VALUE;
            DispatcherThread minThread = this.findMinLoadThread(minLoad, null);
            if (minThread != null) {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return minThread;
            }
            DispatcherThread newThreadIfPossible = this.createNewThreadIfPossible();
            if (newThreadIfPossible != null) {
                newThreadIfPossible.start();
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return newThreadIfPossible;
            }
            // ** MonitorExit[var1_1] (shouldn't be in output)
            throw new RuntimeException("could not assign thread. This is a severe error");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private DispatcherThread findMinLoadThread(long minLoad, DispatcherThread dispatcherThread) {
        DispatcherThread[] dispatcherThreadArray = this.threads;
        synchronized (this.threads) {
            DispatcherThread minThread = null;
            for (int i = 0; i < this.threads.length; ++i) {
                long load;
                DispatcherThread thread = this.threads[i];
                if (thread == null || thread == dispatcherThread || (load = thread.getLoadNanos()) >= minLoad) continue;
                minLoad = load;
                minThread = thread;
            }
            // ** MonitorExit[var4_3] (shouldn't be in output)
            return minThread;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private DispatcherThread createNewThreadIfPossible() {
        DispatcherThread[] dispatcherThreadArray = this.threads;
        synchronized (this.threads) {
            for (int i = 0; i < this.threads.length; ++i) {
                DispatcherThread th;
                DispatcherThread thread = this.threads[i];
                if (thread != null) continue;
                this.threads[i] = th = this.createDispatcherThread();
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return th;
            }
            // ** MonitorExit[var1_1] (shouldn't be in output)
            return null;
        }
    }

    protected DispatcherThread createDispatcherThread() {
        return new DispatcherThread(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void rebalance(DispatcherThread dispatcherThread) {
        Object object = this.balanceLock;
        synchronized (object) {
            long load = dispatcherThread.getLoadNanos();
            DispatcherThread minLoadThread = this.createNewThreadIfPossible();
            if (minLoadThread != null) {
                dispatcherThread.splitTo(minLoadThread);
                minLoadThread.start();
                return;
            }
            minLoadThread = this.findMinLoadThread(load, dispatcherThread);
            if (minLoadThread == null) {
                return;
            }
            Actor[] qList = dispatcherThread.getActors();
            long otherLoad = minLoadThread.getLoadNanos();
            for (int i = 0; i < qList.length; ++i) {
                Actor actor = qList[i];
                if (otherLoad + actor.__nanos >= load - actor.__nanos) continue;
                otherLoad += actor.__nanos;
                load -= actor.__nanos;
                if (DEBUG_SCHEDULING) {
                    Log.Info(this, "move " + actor.__nanos + " myload " + load + " otherlOad " + otherLoad);
                }
                dispatcherThread.removeActorImmediate(actor);
                minLoadThread.addActor(actor);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void tryStopThread(DispatcherThread dispatcherThread) {
        Object object = this.balanceLock;
        synchronized (object) {
            Actor[] qList = dispatcherThread.getActors();
            DispatcherThread minLoadThread = this.findMinLoadThread(Long.MAX_VALUE, dispatcherThread);
            if (minLoadThread == null) {
                return;
            }
            for (int i = 0; i < qList.length; ++i) {
                Actor actor = qList[i];
                dispatcherThread.removeActorImmediate(actor);
                minLoadThread.addActor(actor);
                if (!DEBUG_SCHEDULING) continue;
                Log.Info(this, "move for idle " + actor.__nanos + " myload " + dispatcherThread.getLoadNanos() + " actors " + qList.length);
            }
        }
    }

    @Override
    public BackOffStrategy getBackoffStrategy() {
        return this.backOffStrategy;
    }

    class CallbackInvokeHandler
    implements InvocationHandler {
        final Object target;
        final Actor targetActor;

        public CallbackInvokeHandler(Object target, Actor act) {
            this.target = target;
            this.targetActor = act;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(proxy, args);
            }
            if (this.target != null) {
                CallEntry<Object> ce = new CallEntry<Object>(this.target, method, args, Actor.sender.get(), this.targetActor);
                ElasticScheduler.this.put2QueuePolling(this.targetActor.__cbQueue, ce, this.targetActor);
            }
            return null;
        }
    }
}

