/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.patterns.processing.taskprocessor;

import com.oracle.coherence.common.threading.ExecutorListener;
import com.oracle.coherence.common.threading.ExecutorServiceFactory;
import com.oracle.coherence.common.threading.ObservableExecutor;
import com.oracle.coherence.common.threading.ThreadFactories;
import com.oracle.coherence.common.util.ObjectProxyFactory;
import com.oracle.coherence.patterns.processing.internal.Submission;
import com.oracle.coherence.patterns.processing.internal.SubmissionKeyPair;
import com.oracle.coherence.patterns.processing.internal.SubmissionResult;
import com.oracle.coherence.patterns.processing.internal.task.TaskProcessorMediator;
import com.oracle.coherence.patterns.processing.internal.task.TaskProcessorMediatorKey;
import com.oracle.coherence.patterns.processing.task.AbstractTaskProcessor;
import com.oracle.coherence.patterns.processing.taskprocessor.ClientLeaseMaintainer;
import com.oracle.coherence.patterns.processing.taskprocessor.TaskRunner;
import com.tangosol.io.ExternalizableLite;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.util.ExternalizableHelper;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.logging.Level;
import java.util.logging.Logger;

public class DefaultTaskProcessor
extends AbstractTaskProcessor
implements ExternalizableLite,
PortableObject,
ExecutorListener {
    private static Logger logger = Logger.getLogger(DefaultTaskProcessor.class.getName());
    private int numberOfThreads;
    private transient ExecutorService executorService;
    private ConcurrentLinkedQueue<Runnable> currentlyExecuting = new ConcurrentLinkedQueue();

    public DefaultTaskProcessor() {
    }

    public DefaultTaskProcessor(String name, int numberOfThreads) {
        super(null, name);
        this.numberOfThreads = numberOfThreads;
    }

    @Override
    public void onStartup(TaskProcessorMediator taskProcessorMediator, TaskProcessorMediatorKey taskProcessorMediatorKey, ObjectProxyFactory<Submission> submissionProxyFactory, ObjectProxyFactory<SubmissionResult> submissionResultProxyFactory, ObjectProxyFactory<TaskProcessorMediator> taskProcessorMediatorProxyFactory, ClientLeaseMaintainer clientLeaseMaintainer) {
        this.executorService = ExecutorServiceFactory.newFixedThreadPool((int)this.numberOfThreads, (ThreadFactory)ThreadFactories.newThreadFactory((boolean)true, (String)("DefaultTaskProcessor-" + taskProcessorMediatorKey), null));
        if (this.executorService instanceof ObservableExecutor) {
            ((ObservableExecutor)this.executorService).setCallback((ExecutorListener)this);
        }
        super.onStartup(taskProcessorMediator, taskProcessorMediatorKey, submissionProxyFactory, submissionResultProxyFactory, taskProcessorMediatorProxyFactory, clientLeaseMaintainer);
        if (logger.isLoggable(Level.INFO)) {
            logger.log(Level.INFO, "Starting TaskProcessor {0} with {1} thread(s)", new Object[]{this.getDisplayName(), this.numberOfThreads});
        }
    }

    @Override
    public void onShutdown() {
        if (logger.isLoggable(Level.INFO)) {
            logger.log(Level.INFO, "Shutting down TaskProcessor {0}", this);
        }
        super.onShutdown();
        for (Runnable r : this.currentlyExecuting) {
            if (!(r instanceof TaskRunner)) continue;
            ((TaskRunner)r).interruptExecution();
        }
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    @Override
    public void executeTask(SubmissionKeyPair taskToExecute) {
        if (taskToExecute != null) {
            if (logger.isLoggable(Level.FINEST)) {
                logger.log(Level.FINEST, "TaskProcessor {0} is executing {1}", new Object[]{this, taskToExecute});
            }
            Submission submission = (Submission)this.getSubmissionProxyFactory().getProxy((Object)taskToExecute.getKey());
            TaskRunner task = new TaskRunner(taskToExecute, this.getTaskProcessorMediator(), submission, taskToExecute.getResultId(), this.getSubmissionResultProxyFactory(), this.getTaskProcessorKey());
            this.currentlyExecuting.add(task);
            this.executorService.execute(task);
        }
    }

    @Override
    public String toString() {
        return "DefaultTaskProcessor [numberOfThreads=" + this.numberOfThreads + ", " + (this.getDisplayName() != null ? "getDisplayName()=" + this.getDisplayName() + ", " : "") + (this.getTaskProcessorKey() != null ? "getTaskProcessorKey()=" + this.getTaskProcessorKey() : "") + "]";
    }

    public void afterExecute(Runnable runnable, Throwable t) {
        this.currentlyExecuting.remove(runnable);
    }

    public void beforeExecute(Runnable runnable) {
    }

    @Override
    public void readExternal(DataInput in) throws IOException {
        super.readExternal(in);
        this.numberOfThreads = ExternalizableHelper.readInt((DataInput)in);
    }

    @Override
    public void writeExternal(DataOutput out) throws IOException {
        super.writeExternal(out);
        ExternalizableHelper.writeInt((DataOutput)out, (int)this.numberOfThreads);
    }

    @Override
    public void readExternal(PofReader reader) throws IOException {
        super.readExternal(reader);
        this.numberOfThreads = reader.readInt(10);
    }

    @Override
    public void writeExternal(PofWriter writer) throws IOException {
        super.writeExternal(writer);
        writer.writeInt(10, this.numberOfThreads);
    }
}

