/*
 * Decompiled with CFR 0.152.
 */
package net.sodacan.core.scheduler;

import java.io.Closeable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import net.sodacan.core.ActorGroup;
import net.sodacan.core.ActorId;
import net.sodacan.core.Config;
import net.sodacan.core.Message;
import net.sodacan.core.Scheduler;
import net.sodacan.core.actor.ActorEntry;
import net.sodacan.core.message.Evict;

public class DefaultScheduler
implements Scheduler,
Closeable {
    private Config config;
    protected ActorGroup actorGroup;
    private AtomicInteger messageCount = new AtomicInteger(0);
    private AtomicInteger messageLoad = new AtomicInteger(0);
    private AtomicInteger maxMessageLoad = new AtomicInteger(0);
    private AtomicInteger evictionCount = new AtomicInteger(0);
    private AtomicInteger totalSleepTime = new AtomicInteger(0);
    private AtomicInteger maxThreadQueueDepth = new AtomicInteger(0);
    private final ReentrantLock newMessageLock = new ReentrantLock();
    private ThreadPoolExecutor pool;
    private Map<ActorId, ActorEntry> actorEntries = new ConcurrentHashMap<ActorId, ActorEntry>();

    public DefaultScheduler(Config config) {
        this.config = config;
        this.pool = (ThreadPoolExecutor)Executors.newCachedThreadPool();
    }

    @Override
    public ActorGroup getActorGroup() {
        return this.actorGroup;
    }

    @Override
    public void setActorGroup(ActorGroup actorGroup) {
        this.actorGroup = actorGroup;
    }

    @Override
    public boolean addMessage(Message message) {
        return this.addMessage(message, false);
    }

    protected boolean backpressureWait() {
        while (this.getMessageLoad() >= this.config.getBackpressureLimit()) {
            try {
                Thread.sleep(this.config.getBackpressureWaitMs());
                this.incrementSleepTime();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return true;
    }

    protected void evict(ActorEntry actorEntry) {
        actorEntry.queueMessage(new Evict(actorEntry.getActorId()));
        this.actorEntries.remove(actorEntry.getActorId());
    }

    protected void makeRoomInActorEntries() {
        int eviction = this.config.getEviction();
        int aeSize = this.actorEntries.size();
        if (aeSize < this.config.getActorGroupThreads() + eviction) {
            return;
        }
        ActorEntry[] ae = this.actorEntries.values().toArray(new ActorEntry[aeSize]);
        Arrays.sort(ae, Comparator.comparingLong(ActorEntry::getUsage));
        int x = 0;
        while (x < eviction) {
            this.evict(ae[x]);
            ++x;
        }
    }

    protected synchronized ActorEntry addActorEntry(ActorId actorId) {
        ActorEntry actorEntry = null;
        try {
            this.makeRoomInActorEntries();
            actorEntry = new ActorEntry(this.config, this, actorId);
            this.actorEntries.put(actorId, actorEntry);
            this.pool.execute(actorEntry);
        }
        catch (Exception e1) {
            e1.printStackTrace();
        }
        return actorEntry;
    }

    @Override
    public boolean addMessage(Message message, boolean force) {
        if (!force) {
            this.backpressureWait();
        }
        ActorId targetActorId = null;
        targetActorId = message.getTargetActorId();
        if (targetActorId == null) {
            throw new RuntimeException("In scheduler, Message has no targetActorId");
        }
        ActorEntry actorEntry = this.actorEntries.get(targetActorId);
        if (actorEntry == null) {
            actorEntry = this.addActorEntry(targetActorId);
        }
        actorEntry.queueMessage(message);
        int depth = actorEntry.getQueueSize();
        if (depth > this.maxThreadQueueDepth.get()) {
            this.maxThreadQueueDepth.set(depth);
        }
        return true;
    }

    public void auditActorEntries() {
        for (Map.Entry<ActorId, ActorEntry> entry : this.actorEntries.entrySet()) {
            int cnt = entry.getValue().getQueueSize();
            if (cnt <= 0) continue;
            System.out.println(String.valueOf(entry.getKey()) + " queue size=" + cnt);
            this.pool.execute(entry.getValue());
        }
    }

    @Override
    public void evictAll() {
        int aeSize = this.actorEntries.size();
        ActorEntry[] ae = this.actorEntries.values().toArray(new ActorEntry[aeSize]);
        int x = 0;
        while (x < ae.length) {
            this.evict(ae[x]);
            ++x;
        }
    }

    @Override
    public void waitForMessagesToFinish() {
        try {
            while (this.getMessageLoad() != 0) {
                System.out.println("Shutdown: Waiting for " + this.getMessageLoad() + " messages to finish...");
                Thread.sleep(this.config.getShutdownWaitMs());
            }
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Config getConfig() {
        return this.config;
    }

    @Override
    public void increaseMessageLoad() {
        int load = this.messageLoad.incrementAndGet();
        if (load > this.maxMessageLoad.get()) {
            this.maxMessageLoad.set(load);
        }
    }

    @Override
    public void reduceMessageLoad() {
        this.messageLoad.decrementAndGet();
    }

    @Override
    public int getMessageLoad() {
        return this.messageLoad.get();
    }

    @Override
    public int getMaxMessageLoad() {
        return this.maxMessageLoad.get();
    }

    @Override
    public void incrementSleepTime() {
        this.totalSleepTime.addAndGet(this.getConfig().getBackpressureWaitMs());
    }

    @Override
    public void increaseMessageCount() {
        this.messageCount.incrementAndGet();
    }

    @Override
    public int getMessageCount() {
        return this.messageCount.get();
    }

    @Override
    public int getSleepTime() {
        return this.totalSleepTime.get();
    }

    @Override
    public int getActorCount() {
        return this.actorEntries.size();
    }

    @Override
    public int getMaxThreadQueueDepth() {
        return this.maxThreadQueueDepth.get();
    }

    @Override
    public void increaseEvictionCount() {
        this.evictionCount.incrementAndGet();
    }

    @Override
    public int getEvictionCount() {
        return this.evictionCount.get();
    }

    @Override
    public int getThreadPoolSize() {
        return this.pool.getActiveCount();
    }

    @Override
    public void close() {
        try {
            this.evictAll();
            this.waitForMessagesToFinish();
            this.pool.awaitTermination(1L, TimeUnit.SECONDS);
            this.pool.shutdown();
            System.out.println("Schduler orderly Shutdown complete for " + String.valueOf(this.getActorGroup()));
        }
        catch (Exception e) {
            throw new RuntimeException("Error closing Scheduler for ActorGroup ", e);
        }
    }
}

