/*
 * Decompiled with CFR 0.152.
 */
package io.nflow.engine.internal.executor;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.nflow.engine.exception.DispatcherExceptionAnalyzer;
import io.nflow.engine.exception.DispatcherExceptionHandling;
import io.nflow.engine.internal.dao.ExecutorDao;
import io.nflow.engine.internal.dao.WorkflowInstanceDao;
import io.nflow.engine.internal.executor.WorkflowInstanceExecutor;
import io.nflow.engine.internal.executor.WorkflowStateProcessorFactory;
import io.nflow.engine.internal.util.NflowLogger;
import io.nflow.engine.internal.util.PeriodicLogger;
import io.nflow.engine.service.WorkflowDefinitionService;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

@Component
@SuppressFBWarnings(value={"MDM_RANDOM_SEED"}, justification="rand does not need to be secure")
public class WorkflowDispatcher
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(WorkflowDispatcher.class);
    private static final PeriodicLogger periodicLogger = new PeriodicLogger(logger, 60);
    private volatile boolean shutdownRequested;
    private volatile boolean running;
    private volatile boolean paused;
    private final CountDownLatch shutdownDone = new CountDownLatch(1);
    private final WorkflowInstanceExecutor executor;
    private final WorkflowInstanceDao workflowInstances;
    private final WorkflowStateProcessorFactory stateProcessorFactory;
    private final WorkflowDefinitionService workflowDefinitions;
    private final ExecutorDao executorDao;
    private final DispatcherExceptionAnalyzer exceptionAnalyzer;
    private final NflowLogger nflowLogger;
    private final long sleepTimeMillis;
    private final int stuckThreadThresholdSeconds;
    private final Random rand = new Random();

    @Inject
    @SuppressFBWarnings(value={"WEM_WEAK_EXCEPTION_MESSAGING"}, justification="Transaction support exception message is fine")
    public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao workflowInstances, WorkflowStateProcessorFactory stateProcessorFactory, WorkflowDefinitionService workflowDefinitions, ExecutorDao executorDao, DispatcherExceptionAnalyzer exceptionAnalyzer, NflowLogger nflowLogger, Environment env) {
        this.executor = executor;
        this.workflowInstances = workflowInstances;
        this.stateProcessorFactory = stateProcessorFactory;
        this.workflowDefinitions = workflowDefinitions;
        this.executorDao = executorDao;
        this.exceptionAnalyzer = exceptionAnalyzer;
        this.nflowLogger = nflowLogger;
        this.sleepTimeMillis = env.getRequiredProperty("nflow.dispatcher.sleep.ms", Long.class);
        this.stuckThreadThresholdSeconds = env.getRequiredProperty("nflow.executor.stuckThreadThreshold.seconds", Integer.class);
        if (!executorDao.isTransactionSupportEnabled()) {
            throw new BeanCreationException("Transaction support must be enabled");
        }
        if (!executorDao.isAutoCommitEnabled()) {
            throw new BeanCreationException("DataSource must have auto commit enabled");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        logger.info("Dispacther started.");
        try {
            this.workflowDefinitions.postProcessWorkflowDefinitions();
            this.running = true;
            while (!this.shutdownRequested) {
                if (this.paused) {
                    this.sleep(false);
                    continue;
                }
                try {
                    int potentiallyStuckProcessors;
                    this.executor.waitUntilQueueSizeLowerThanThreshold(this.executorDao.getMaxWaitUntil());
                    if (this.shutdownRequested) continue;
                    if (this.executorDao.tick()) {
                        this.workflowInstances.recoverWorkflowInstancesFromDeadNodes();
                    }
                    if ((potentiallyStuckProcessors = this.stateProcessorFactory.getPotentiallyStuckProcessors()) > 0) {
                        periodicLogger.warn("{} of {} state processor threads are potentially stuck (processing longer than {} seconds)", potentiallyStuckProcessors, this.executor.getThreadCount(), this.stuckThreadThresholdSeconds);
                    }
                    this.dispatch(this.getNextInstanceIds());
                }
                catch (Exception e) {
                    DispatcherExceptionHandling handling = this.exceptionAnalyzer.analyzeSafely(e);
                    if (handling.log) {
                        if (handling.logStackTrace) {
                            StringBuilder sb = new StringBuilder("Exception in executing dispatcher - retrying");
                            if (handling.sleep) {
                                sb.append(" after sleep period");
                            }
                            this.nflowLogger.log(logger, handling.logLevel, sb.append(" ({})").toString(), e.getMessage(), e);
                        } else {
                            this.nflowLogger.log(logger, handling.logLevel, e.getMessage(), new Object[0]);
                        }
                    }
                    if (!handling.sleep) continue;
                    this.sleep(handling.randomizeSleep);
                }
            }
        }
        finally {
            this.shutdownPool();
            this.executorDao.markShutdown();
            this.running = false;
            logger.info("Shutdown completed.");
            this.shutdownDone.countDown();
        }
    }

    public void shutdown() {
        if (this.running) {
            if (!this.shutdownRequested) {
                logger.info("Initiating shutdown.");
                this.shutdownRequested = true;
            }
            try {
                this.shutdownDone.await();
            }
            catch (InterruptedException e) {
                logger.warn("Shutdown interrupted.");
            }
        } else {
            logger.info("Dispatcher was not started or was already shut down.");
        }
    }

    public void pause() {
        this.paused = true;
        logger.info("Dispatcher paused.");
    }

    public void resume() {
        this.paused = false;
        logger.info("Dispatcher resumed.");
    }

    public boolean isPaused() {
        return this.paused;
    }

    public boolean isRunning() {
        return this.running;
    }

    private void shutdownPool() {
        try {
            this.executor.shutdown();
        }
        catch (Exception e) {
            logger.error("Error in shutting down thread pool.", e);
        }
    }

    private void dispatch(List<Long> nextInstanceIds) {
        if (nextInstanceIds.isEmpty()) {
            logger.debug("Found no workflow instances, sleeping.");
            this.sleep(false);
            return;
        }
        logger.debug("Found {} workflow instances, dispatching executors.", (Object)nextInstanceIds.size());
        for (Long instanceId : nextInstanceIds) {
            this.executor.execute(this.stateProcessorFactory.createProcessor(instanceId, () -> this.shutdownRequested));
        }
    }

    private List<Long> getNextInstanceIds() {
        int nextBatchSize = this.executor.getQueueRemainingCapacity();
        logger.debug("Polling next {} workflow instances.", (Object)nextBatchSize);
        return this.workflowInstances.pollNextWorkflowInstanceIds(nextBatchSize);
    }

    @SuppressFBWarnings(value={"MDM_THREAD_YIELD"}, justification="Intentionally masking race condition")
    private void sleep(boolean randomize) {
        try {
            if (randomize) {
                Thread.sleep((long)((float)this.sleepTimeMillis * this.rand.nextFloat()));
            } else {
                Thread.sleep(this.sleepTimeMillis);
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }
}

