package com.oracle.coherence.patterns.processing.taskprocessor;

import com.oracle.coherence.common.threading.ExecutorListener;
import com.oracle.coherence.common.threading.ExecutorServiceFactory;
import com.oracle.coherence.common.threading.ObservableExecutor;
import com.oracle.coherence.common.threading.ThreadFactories;
import com.oracle.coherence.common.util.ObjectProxyFactory;
import com.oracle.coherence.patterns.processing.internal.Submission;
import com.oracle.coherence.patterns.processing.internal.SubmissionKeyPair;
import com.oracle.coherence.patterns.processing.internal.SubmissionResult;
import com.oracle.coherence.patterns.processing.internal.task.TaskProcessorMediator;
import com.oracle.coherence.patterns.processing.internal.task.TaskProcessorMediatorKey;
import com.oracle.coherence.patterns.processing.task.AbstractTaskProcessor;
import com.tangosol.io.ExternalizableLite;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.util.ExternalizableHelper;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/oracle/coherence/patterns/processing/taskprocessor/DefaultTaskProcessor.class */
public class DefaultTaskProcessor extends AbstractTaskProcessor implements ExternalizableLite, PortableObject, ExecutorListener {
    private static Logger logger = Logger.getLogger(DefaultTaskProcessor.class.getName());
    private int numberOfThreads;
    private transient ExecutorService executorService;
    private ConcurrentLinkedQueue<Runnable> currentlyExecuting;

    public DefaultTaskProcessor() {
        this.currentlyExecuting = new ConcurrentLinkedQueue<>();
    }

    public DefaultTaskProcessor(String str, int i) {
        super(null, str);
        this.currentlyExecuting = new ConcurrentLinkedQueue<>();
        this.numberOfThreads = i;
    }

    @Override // com.oracle.coherence.patterns.processing.task.AbstractTaskProcessor, com.oracle.coherence.patterns.processing.task.TaskProcessor
    public void onStartup(TaskProcessorMediator taskProcessorMediator, TaskProcessorMediatorKey taskProcessorMediatorKey, ObjectProxyFactory<Submission> objectProxyFactory, ObjectProxyFactory<SubmissionResult> objectProxyFactory2, ObjectProxyFactory<TaskProcessorMediator> objectProxyFactory3, ClientLeaseMaintainer clientLeaseMaintainer) {
        this.executorService = ExecutorServiceFactory.newFixedThreadPool(this.numberOfThreads, ThreadFactories.newThreadFactory(true, "DefaultTaskProcessor-" + taskProcessorMediatorKey, null));
        if (this.executorService instanceof ObservableExecutor) {
            ((ObservableExecutor) this.executorService).setCallback(this);
        }
        super.onStartup(taskProcessorMediator, taskProcessorMediatorKey, objectProxyFactory, objectProxyFactory2, objectProxyFactory3, clientLeaseMaintainer);
        if (logger.isLoggable(Level.INFO)) {
            logger.log(Level.INFO, "Starting TaskProcessor {0} with {1} thread(s)", new Object[]{getDisplayName(), Integer.valueOf(this.numberOfThreads)});
        }
    }

    @Override // com.oracle.coherence.patterns.processing.task.AbstractTaskProcessor, com.oracle.coherence.patterns.processing.task.TaskProcessor
    public void onShutdown() {
        if (logger.isLoggable(Level.INFO)) {
            logger.log(Level.INFO, "Shutting down TaskProcessor {0}", this);
        }
        super.onShutdown();
        Iterator<Runnable> it = this.currentlyExecuting.iterator();
        while (it.hasNext()) {
            Runnable next = it.next();
            if (next instanceof TaskRunner) {
                ((TaskRunner) next).interruptExecution();
            }
        }
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    @Override // com.oracle.coherence.patterns.processing.task.AbstractTaskProcessor
    public void executeTask(SubmissionKeyPair submissionKeyPair) {
        if (submissionKeyPair != null) {
            if (logger.isLoggable(Level.FINEST)) {
                logger.log(Level.FINEST, "TaskProcessor {0} is executing {1}", new Object[]{this, submissionKeyPair});
            }
            TaskRunner taskRunner = new TaskRunner(submissionKeyPair, getTaskProcessorMediator(), getSubmissionProxyFactory().getProxy(submissionKeyPair.getKey()), submissionKeyPair.getResultId(), getSubmissionResultProxyFactory(), getTaskProcessorKey());
            this.currentlyExecuting.add(taskRunner);
            this.executorService.execute(taskRunner);
        }
    }

    @Override // com.oracle.coherence.patterns.processing.task.AbstractTaskProcessor
    public String toString() {
        return "DefaultTaskProcessor [numberOfThreads=" + this.numberOfThreads + ", " + (getDisplayName() != null ? "getDisplayName()=" + getDisplayName() + ", " : "") + (getTaskProcessorKey() != null ? "getTaskProcessorKey()=" + getTaskProcessorKey() : "") + "]";
    }

    @Override // com.oracle.coherence.common.threading.ExecutorListener
    public void afterExecute(Runnable runnable, Throwable th) {
        this.currentlyExecuting.remove(runnable);
    }

    @Override // com.oracle.coherence.common.threading.ExecutorListener
    public void beforeExecute(Runnable runnable) {
    }

    @Override // com.oracle.coherence.patterns.processing.task.AbstractTaskProcessor
    public void readExternal(DataInput dataInput) throws IOException {
        super.readExternal(dataInput);
        this.numberOfThreads = ExternalizableHelper.readInt(dataInput);
    }

    @Override // com.oracle.coherence.patterns.processing.task.AbstractTaskProcessor
    public void writeExternal(DataOutput dataOutput) throws IOException {
        super.writeExternal(dataOutput);
        ExternalizableHelper.writeInt(dataOutput, this.numberOfThreads);
    }

    @Override // com.oracle.coherence.patterns.processing.task.AbstractTaskProcessor
    public void readExternal(PofReader pofReader) throws IOException {
        super.readExternal(pofReader);
        this.numberOfThreads = pofReader.readInt(10);
    }

    @Override // com.oracle.coherence.patterns.processing.task.AbstractTaskProcessor
    public void writeExternal(PofWriter pofWriter) throws IOException {
        super.writeExternal(pofWriter);
        pofWriter.writeInt(10, this.numberOfThreads);
    }
}
