package com.oracle.coherence.patterns.processing.dispatchers.local;

import com.oracle.coherence.common.identifiers.Identifier;
import com.oracle.coherence.common.threading.ExecutorServiceFactory;
import com.oracle.coherence.common.threading.ThreadFactories;
import com.oracle.coherence.common.util.ObjectChangeCallback;
import com.oracle.coherence.common.util.ObjectProxyFactory;
import com.oracle.coherence.patterns.processing.SubmissionState;
import com.oracle.coherence.patterns.processing.dispatchers.AbstractDispatcher;
import com.oracle.coherence.patterns.processing.dispatchers.DispatchController;
import com.oracle.coherence.patterns.processing.dispatchers.DispatchOutcome;
import com.oracle.coherence.patterns.processing.dispatchers.PendingSubmission;
import com.oracle.coherence.patterns.processing.internal.Environment;
import com.oracle.coherence.patterns.processing.internal.Submission;
import com.oracle.coherence.patterns.processing.internal.SubmissionContent;
import com.oracle.coherence.patterns.processing.internal.SubmissionKeyPair;
import com.oracle.coherence.patterns.processing.internal.SubmissionResult;
import com.tangosol.io.ExternalizableLite;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.net.CacheFactory;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/oracle/coherence/patterns/processing/dispatchers/local/LocalExecutorDispatcher.class */
public class LocalExecutorDispatcher extends AbstractDispatcher implements ExternalizableLite, PortableObject, LocalExecutorDispatcherMBean {
    private static Logger logger = Logger.getLogger(LocalExecutorDispatcher.class.getName());
    private transient ExecutorService executorService;
    private transient ObjectProxyFactory<Submission> submissionProxyFactory;
    private transient ObjectProxyFactory<SubmissionResult> submissionResultProxyFactory;
    private transient int noOfferedSubmissions;
    private transient int noAcceptedSubmissions;
    private transient Environment environment;
    private int numberOfThreads;

    /* loaded from: input_file:com/oracle/coherence/patterns/processing/dispatchers/local/LocalExecutorDispatcher$ProcessRunner.class */
    public static class ProcessRunner implements Runnable, ObjectChangeCallback<SubmissionResult> {
        private static Logger logger = Logger.getLogger(ProcessRunner.class.getName());
        private Identifier submissionResultIdentifier;
        private Submission submission;
        private SubmissionKeyPair submissionKeyPair;
        private Thread executionThread;
        private ObjectProxyFactory<SubmissionResult> submissionResultProxyFactory;
        private volatile transient boolean isCancelled = false;

        public ProcessRunner(ObjectProxyFactory<SubmissionResult> objectProxyFactory, SubmissionKeyPair submissionKeyPair, Identifier identifier, Submission submission) {
            this.submissionResultProxyFactory = objectProxyFactory;
            this.submissionResultIdentifier = identifier;
            this.submission = submission;
            this.submissionKeyPair = submissionKeyPair;
        }

        @Override // java.lang.Runnable
        public void run() {
            Object call;
            SubmissionResult proxy = this.submissionResultProxyFactory.getProxy(this.submissionResultIdentifier);
            try {
                this.executionThread = Thread.currentThread();
                this.submissionResultProxyFactory.registerChangeCallback(proxy, this);
                Integer num = new Integer(CacheFactory.getCluster().getLocalMember().getId());
                if (proxy.assign(num) == null && proxy.processingStarted(num)) {
                    try {
                        SubmissionContent content = this.submission.getContent();
                        if (content.getPayload() instanceof Runnable) {
                            ((Runnable) content.getPayload()).run();
                            call = null;
                        } else {
                            if (!(content.getPayload() instanceof Callable)) {
                                throw new UnsupportedOperationException(String.format("Can't execute %s as it's neither Runnable or Callable", this.submissionKeyPair));
                            }
                            call = ((Callable) content.getPayload()).call();
                        }
                        if (Thread.interrupted() && logger.isLoggable(Level.WARNING)) {
                            logger.log(Level.WARNING, "The Submission {0} was intentionally interrupted (by calling cancelSubmission or shutting down) during execution", new Object[]{content.getPayload()});
                        }
                        if (this.isCancelled) {
                            if (logger.isLoggable(Level.FINER)) {
                                logger.log(Level.FINER, "Submission {0} was CANCELLED", new Object[]{this.submissionKeyPair});
                            }
                        } else if (proxy.processingSucceeded(call) && logger.isLoggable(Level.FINER)) {
                            logger.log(Level.FINER, "Executed {0} to produce {1}", new Object[]{this.submissionKeyPair, call});
                        }
                    } catch (Exception e) {
                        if (logger.isLoggable(Level.WARNING)) {
                            logger.log(Level.WARNING, "Failed to process {0} due to:\n{1}", new Object[]{this.submissionKeyPair, e});
                        }
                        proxy.processingFailed(e);
                    }
                }
            } finally {
                this.submissionResultProxyFactory.unregisterChangeCallback(proxy, this);
            }
        }

        @Override // com.oracle.coherence.common.util.ObjectChangeCallback
        public void objectChanged(SubmissionResult submissionResult) {
            if (submissionResult == null || submissionResult.getSubmissionState() != SubmissionState.CANCELLED) {
                return;
            }
            if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, "Received change of state to CANCELLED - interrupting execution of {0}", new Object[]{this.submissionKeyPair});
            }
            this.isCancelled = true;
            this.executionThread.interrupt();
        }

        @Override // com.oracle.coherence.common.util.ObjectChangeCallback
        public void objectCreated(SubmissionResult submissionResult) {
        }

        @Override // com.oracle.coherence.common.util.ObjectChangeCallback
        public void objectDeleted(Object obj) {
        }
    }

    public LocalExecutorDispatcher() {
    }

    public LocalExecutorDispatcher(Environment environment, String str, int i) {
        super(str);
        this.environment = environment;
        this.submissionResultProxyFactory = (ObjectProxyFactory) environment.getResource(SubmissionResult.class);
        this.submissionProxyFactory = (ObjectProxyFactory) environment.getResource(Submission.class);
        this.numberOfThreads = i;
    }

    @Override // com.oracle.coherence.patterns.processing.dispatchers.Dispatcher
    public DispatchOutcome dispatch(PendingSubmission pendingSubmission) {
        this.noOfferedSubmissions++;
        if (!(pendingSubmission.getPayload() instanceof Runnable) && !(pendingSubmission.getPayload() instanceof Callable)) {
            return DispatchOutcome.REJECTED;
        }
        this.executorService.execute(new ProcessRunner(this.submissionResultProxyFactory, new SubmissionKeyPair(pendingSubmission.getSubmissionKey(), pendingSubmission.getResultIdentifier()), pendingSubmission.getResultIdentifier(), this.submissionProxyFactory.getProxy(pendingSubmission.getSubmissionKey())));
        this.noAcceptedSubmissions++;
        return DispatchOutcome.ACCEPTED;
    }

    @Override // com.oracle.coherence.patterns.processing.dispatchers.AbstractDispatcher, com.oracle.coherence.patterns.processing.dispatchers.Dispatcher
    public void onShutdown(DispatchController dispatchController) {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
        super.onShutdown(dispatchController);
    }

    @Override // com.oracle.coherence.patterns.processing.dispatchers.AbstractDispatcher, com.oracle.coherence.patterns.processing.dispatchers.Dispatcher
    public void onStartup(DispatchController dispatchController) {
        this.environment = (Environment) dispatchController.getConfigurableCacheFactory().getResourceRegistry().getResource(Environment.class);
        this.submissionResultProxyFactory = (ObjectProxyFactory) this.environment.getResource(SubmissionResult.class);
        this.submissionProxyFactory = (ObjectProxyFactory) this.environment.getResource(Submission.class);
        super.onStartup(dispatchController);
        registerMBean();
        this.executorService = ExecutorServiceFactory.newFixedThreadPool(this.numberOfThreads, ThreadFactories.newThreadFactory(true, "LocalExecutorDispatcher", null));
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Starting with a Thread pool size of {0}", Integer.valueOf(this.numberOfThreads));
        }
    }

    @Override // com.oracle.coherence.patterns.processing.dispatchers.local.LocalExecutorDispatcherMBean
    public String getDispatcherName() {
        return getName();
    }

    @Override // com.oracle.coherence.patterns.processing.dispatchers.local.LocalExecutorDispatcherMBean
    public int getSubmissionsAccepted() {
        return this.noAcceptedSubmissions;
    }

    @Override // com.oracle.coherence.patterns.processing.dispatchers.local.LocalExecutorDispatcherMBean
    public int getSubmissionsOffered() {
        return this.noOfferedSubmissions;
    }

    @Override // com.oracle.coherence.patterns.processing.dispatchers.AbstractDispatcher
    public void readExternal(DataInput dataInput) throws IOException {
        super.readExternal(dataInput);
        this.numberOfThreads = dataInput.readInt();
    }

    @Override // com.oracle.coherence.patterns.processing.dispatchers.AbstractDispatcher
    public void writeExternal(DataOutput dataOutput) throws IOException {
        super.writeExternal(dataOutput);
        dataOutput.writeInt(this.numberOfThreads);
    }

    @Override // com.oracle.coherence.patterns.processing.dispatchers.AbstractDispatcher
    public void readExternal(PofReader pofReader) throws IOException {
        super.readExternal(pofReader);
        this.numberOfThreads = pofReader.readInt(10);
    }

    @Override // com.oracle.coherence.patterns.processing.dispatchers.AbstractDispatcher
    public void writeExternal(PofWriter pofWriter) throws IOException {
        super.writeExternal(pofWriter);
        pofWriter.writeInt(10, this.numberOfThreads);
    }
}
