/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.patterns.processing.dispatchers.local;

import com.oracle.coherence.common.identifiers.Identifier;
import com.oracle.coherence.common.threading.ExecutorServiceFactory;
import com.oracle.coherence.common.threading.ThreadFactories;
import com.oracle.coherence.common.util.ObjectChangeCallback;
import com.oracle.coherence.common.util.ObjectProxyFactory;
import com.oracle.coherence.patterns.processing.SubmissionState;
import com.oracle.coherence.patterns.processing.dispatchers.AbstractDispatcher;
import com.oracle.coherence.patterns.processing.dispatchers.DispatchController;
import com.oracle.coherence.patterns.processing.dispatchers.DispatchOutcome;
import com.oracle.coherence.patterns.processing.dispatchers.PendingSubmission;
import com.oracle.coherence.patterns.processing.dispatchers.local.LocalExecutorDispatcherMBean;
import com.oracle.coherence.patterns.processing.internal.Environment;
import com.oracle.coherence.patterns.processing.internal.Submission;
import com.oracle.coherence.patterns.processing.internal.SubmissionContent;
import com.oracle.coherence.patterns.processing.internal.SubmissionKeyPair;
import com.oracle.coherence.patterns.processing.internal.SubmissionResult;
import com.tangosol.io.ExternalizableLite;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.net.CacheFactory;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.logging.Level;
import java.util.logging.Logger;

public class LocalExecutorDispatcher
extends AbstractDispatcher
implements ExternalizableLite,
PortableObject,
LocalExecutorDispatcherMBean {
    private static Logger logger = Logger.getLogger(LocalExecutorDispatcher.class.getName());
    private transient ExecutorService executorService;
    private transient ObjectProxyFactory<Submission> submissionProxyFactory;
    private transient ObjectProxyFactory<SubmissionResult> submissionResultProxyFactory;
    private transient int noOfferedSubmissions;
    private transient int noAcceptedSubmissions;
    private transient Environment environment;
    private int numberOfThreads;

    public LocalExecutorDispatcher() {
    }

    public LocalExecutorDispatcher(Environment environment, String sName, int numberOfThreads) {
        super(sName);
        this.environment = environment;
        this.submissionResultProxyFactory = (ObjectProxyFactory)environment.getResource(SubmissionResult.class);
        this.submissionProxyFactory = (ObjectProxyFactory)environment.getResource(Submission.class);
        this.numberOfThreads = numberOfThreads;
    }

    @Override
    public DispatchOutcome dispatch(PendingSubmission oPendingProcess) {
        ++this.noOfferedSubmissions;
        if (oPendingProcess.getPayload() instanceof Runnable || oPendingProcess.getPayload() instanceof Callable) {
            Submission remoteSubmission = (Submission)this.submissionProxyFactory.getProxy((Object)oPendingProcess.getSubmissionKey());
            this.executorService.execute(new ProcessRunner(this.submissionResultProxyFactory, new SubmissionKeyPair(oPendingProcess.getSubmissionKey(), oPendingProcess.getResultIdentifier()), oPendingProcess.getResultIdentifier(), remoteSubmission));
            ++this.noAcceptedSubmissions;
            return DispatchOutcome.ACCEPTED;
        }
        return DispatchOutcome.REJECTED;
    }

    @Override
    public void onShutdown(DispatchController dispatchController) {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
        super.onShutdown(dispatchController);
    }

    @Override
    public void onStartup(DispatchController dispatchController) {
        this.environment = (Environment)dispatchController.getConfigurableCacheFactory().getResourceRegistry().getResource(Environment.class);
        this.submissionResultProxyFactory = (ObjectProxyFactory)this.environment.getResource(SubmissionResult.class);
        this.submissionProxyFactory = (ObjectProxyFactory)this.environment.getResource(Submission.class);
        super.onStartup(dispatchController);
        this.registerMBean();
        this.executorService = ExecutorServiceFactory.newFixedThreadPool((int)this.numberOfThreads, (ThreadFactory)ThreadFactories.newThreadFactory((boolean)true, (String)"LocalExecutorDispatcher", null));
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Starting with a Thread pool size of {0}", this.numberOfThreads);
        }
    }

    @Override
    public String getDispatcherName() {
        return this.getName();
    }

    @Override
    public int getSubmissionsAccepted() {
        return this.noAcceptedSubmissions;
    }

    @Override
    public int getSubmissionsOffered() {
        return this.noOfferedSubmissions;
    }

    @Override
    public void readExternal(DataInput in) throws IOException {
        super.readExternal(in);
        this.numberOfThreads = in.readInt();
    }

    @Override
    public void writeExternal(DataOutput out) throws IOException {
        super.writeExternal(out);
        out.writeInt(this.numberOfThreads);
    }

    @Override
    public void readExternal(PofReader oReader) throws IOException {
        super.readExternal(oReader);
        this.numberOfThreads = oReader.readInt(10);
    }

    @Override
    public void writeExternal(PofWriter oWriter) throws IOException {
        super.writeExternal(oWriter);
        oWriter.writeInt(10, this.numberOfThreads);
    }

    public static class ProcessRunner
    implements Runnable,
    ObjectChangeCallback<SubmissionResult> {
        private static Logger logger = Logger.getLogger(ProcessRunner.class.getName());
        private Identifier submissionResultIdentifier;
        private Submission submission;
        private SubmissionKeyPair submissionKeyPair;
        private Thread executionThread;
        private ObjectProxyFactory<SubmissionResult> submissionResultProxyFactory;
        private volatile transient boolean isCancelled;

        public ProcessRunner(ObjectProxyFactory<SubmissionResult> submissionResultProxyFactory, SubmissionKeyPair submissionKeyPair, Identifier submissionResultIdentifier, Submission submission) {
            this.submissionResultProxyFactory = submissionResultProxyFactory;
            this.submissionResultIdentifier = submissionResultIdentifier;
            this.submission = submission;
            this.submissionKeyPair = submissionKeyPair;
            this.isCancelled = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block15: {
                SubmissionResult submissionResult = (SubmissionResult)this.submissionResultProxyFactory.getProxy((Object)this.submissionResultIdentifier);
                try {
                    this.executionThread = Thread.currentThread();
                    this.submissionResultProxyFactory.registerChangeCallback((Object)submissionResult, (ObjectChangeCallback)this);
                    Integer MemberId = new Integer(CacheFactory.getCluster().getLocalMember().getId());
                    if (submissionResult.assign(MemberId) != null || !submissionResult.processingStarted(MemberId)) break block15;
                    try {
                        Object oResult = null;
                        SubmissionContent submissionContent = this.submission.getContent();
                        if (submissionContent.getPayload() instanceof Runnable) {
                            ((Runnable)submissionContent.getPayload()).run();
                            oResult = null;
                        } else if (submissionContent.getPayload() instanceof Callable) {
                            oResult = ((Callable)submissionContent.getPayload()).call();
                        } else {
                            throw new UnsupportedOperationException(String.format("Can't execute %s as it's neither Runnable or Callable", this.submissionKeyPair));
                        }
                        if (Thread.interrupted() && logger.isLoggable(Level.WARNING)) {
                            logger.log(Level.WARNING, "The Submission {0} was intentionally interrupted (by calling cancelSubmission or shutting down) during execution", new Object[]{submissionContent.getPayload()});
                        }
                        if (!this.isCancelled) {
                            if (submissionResult.processingSucceeded(oResult) && logger.isLoggable(Level.FINER)) {
                                logger.log(Level.FINER, "Executed {0} to produce {1}", new Object[]{this.submissionKeyPair, oResult});
                            }
                        } else if (logger.isLoggable(Level.FINER)) {
                            logger.log(Level.FINER, "Submission {0} was CANCELLED", new Object[]{this.submissionKeyPair});
                        }
                    }
                    catch (Exception oException) {
                        if (logger.isLoggable(Level.WARNING)) {
                            logger.log(Level.WARNING, "Failed to process {0} due to:\n{1}", new Object[]{this.submissionKeyPair, oException});
                        }
                        submissionResult.processingFailed(oException);
                    }
                }
                finally {
                    this.submissionResultProxyFactory.unregisterChangeCallback((Object)submissionResult, (ObjectChangeCallback)this);
                }
            }
        }

        public void objectChanged(SubmissionResult object) {
            if (object != null && object.getSubmissionState() == SubmissionState.CANCELLED) {
                if (logger.isLoggable(Level.FINER)) {
                    logger.log(Level.FINER, "Received change of state to CANCELLED - interrupting execution of {0}", new Object[]{this.submissionKeyPair});
                }
                this.isCancelled = true;
                this.executionThread.interrupt();
            }
        }

        public void objectCreated(SubmissionResult object) {
        }

        public void objectDeleted(Object key) {
        }
    }
}

