/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.patterns.processing.dispatchers.task;

import com.oracle.coherence.common.identifiers.Identifier;
import com.oracle.coherence.common.util.ObjectProxyFactory;
import com.oracle.coherence.patterns.processing.dispatchers.AbstractDispatcher;
import com.oracle.coherence.patterns.processing.dispatchers.DispatchController;
import com.oracle.coherence.patterns.processing.dispatchers.DispatchOutcome;
import com.oracle.coherence.patterns.processing.dispatchers.PendingSubmission;
import com.oracle.coherence.patterns.processing.dispatchers.task.DefaultTaskDispatcherMBean;
import com.oracle.coherence.patterns.processing.dispatchers.task.TaskDispatchPolicy;
import com.oracle.coherence.patterns.processing.internal.Environment;
import com.oracle.coherence.patterns.processing.internal.Submission;
import com.oracle.coherence.patterns.processing.internal.SubmissionResult;
import com.oracle.coherence.patterns.processing.internal.task.DefaultTaskProcessorMediator;
import com.oracle.coherence.patterns.processing.internal.task.TaskProcessorMediator;
import com.oracle.coherence.patterns.processing.internal.task.TaskProcessorMediatorKey;
import com.oracle.coherence.patterns.processing.internal.task.TaskProcessorMediatorProxyMBean;
import com.oracle.coherence.patterns.processing.internal.task.TaskProcessorStateEnum;
import com.oracle.coherence.patterns.processing.task.Task;
import com.oracle.coherence.patterns.processing.task.TaskProcessor;
import com.oracle.coherence.patterns.processing.task.TaskProcessorDefinition;
import com.oracle.coherence.patterns.processing.task.TaskProcessorType;
import com.oracle.coherence.patterns.processing.taskprocessor.ClientLeaseMaintainer;
import com.tangosol.io.ExternalizableLite;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.CacheService;
import com.tangosol.net.ConfigurableCacheFactory;
import com.tangosol.net.DistributedCacheService;
import com.tangosol.net.Member;
import com.tangosol.net.NamedCache;
import com.tangosol.util.ExternalizableHelper;
import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener;
import com.tangosol.util.MultiplexingMapListener;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class DefaultTaskDispatcher
extends AbstractDispatcher
implements ExternalizableLite,
PortableObject,
DefaultTaskDispatcherMBean {
    private static final int NO_SELECTION_RETRIES = 3;
    private static ClientLeaseMaintainer clientLeaseMaintainer;
    private static Logger logger;
    private transient NamedCache taskProcessorMediatorCache;
    private ConcurrentHashMap<TaskProcessorMediatorKey, TaskProcessorMediator> taskProcessorMediators;
    private TaskDispatchPolicy taskDispatchPolicy;
    private transient NamedCache taskProcessorDefinitionCache;
    private transient ConcurrentHashMap<Identifier, TaskProcessorDefinition> taskProcessorDefinitionList;
    private transient LinkedBlockingQueue<TaskProcessor> localTaskProcessors;
    private transient MapListener taskProcessorMediatorListener;
    private transient MapListener taskProcessorDefinitionListener;
    private transient ObjectProxyFactory<SubmissionResult> submissionResultProxyFactory;
    private transient ObjectProxyFactory<Submission> submissionProxyFactory;
    private transient ObjectProxyFactory<TaskProcessorMediator> taskProcessorMediatorFactory;
    private transient ObjectProxyFactory<TaskProcessorMediatorProxyMBean> taskProcessorMediatorMBeanFactory;
    private transient int noOfferedSubmissions;
    private transient int noAcceptedSubmissions;
    private transient Environment environment;

    public DefaultTaskDispatcher() {
        this.taskProcessorDefinitionList = new ConcurrentHashMap();
        this.localTaskProcessors = new LinkedBlockingQueue();
        this.taskProcessorMediators = new ConcurrentHashMap();
    }

    public DefaultTaskDispatcher(Environment environment, String name, TaskDispatchPolicy taskdispatchpolicy) {
        super(name);
        this.environment = environment;
        this.taskDispatchPolicy = taskdispatchpolicy;
        this.taskProcessorDefinitionList = new ConcurrentHashMap();
        this.localTaskProcessors = new LinkedBlockingQueue();
        this.taskProcessorMediators = new ConcurrentHashMap();
    }

    public DefaultTaskDispatcher(Environment environment, String name, TaskDispatchPolicy taskdispatchpolicy, ObjectProxyFactory<TaskProcessorMediator> taskProcessorMediatorFactory, ObjectProxyFactory<SubmissionResult> submissionResultProxyFactory, ObjectProxyFactory<Submission> submissionProxyFactory) {
        this(environment, name, taskdispatchpolicy);
        this.taskProcessorMediatorFactory = taskProcessorMediatorFactory;
        this.submissionProxyFactory = submissionProxyFactory;
        this.submissionResultProxyFactory = submissionResultProxyFactory;
    }

    public static void setClientLeaseMaintainer(ClientLeaseMaintainer clientLeaseMaintainer) {
        DefaultTaskDispatcher.clientLeaseMaintainer = clientLeaseMaintainer;
    }

    @Override
    public void onShutdown(DispatchController dispatchController) {
        if (logger.isLoggable(Level.INFO)) {
            logger.log(Level.INFO, "Shutting down DefaultTaskDispatcher");
        }
        this.taskProcessorMediatorCache.removeMapListener(this.taskProcessorMediatorListener);
        this.taskProcessorDefinitionCache.removeMapListener(this.taskProcessorDefinitionListener);
        Iterator<TaskProcessor> iter = this.localTaskProcessors.iterator();
        while (iter.hasNext()) {
            iter.next().onShutdown();
        }
        super.onShutdown(dispatchController);
    }

    @Override
    public void onStartup(DispatchController dispatchController) {
        super.onStartup(dispatchController);
        this.initialize(dispatchController.getConfigurableCacheFactory());
        this.registerMBean();
    }

    @Override
    public DispatchOutcome dispatch(PendingSubmission oPendingProcess) {
        ++this.noOfferedSubmissions;
        Object payload = oPendingProcess.getPayload();
        if (payload instanceof Task) {
            if (this.dispatchTask(oPendingProcess, (Task)payload)) {
                ++this.noAcceptedSubmissions;
                if (logger.isLoggable(Level.FINER)) {
                    logger.log(Level.FINER, "Accepting submission {0}", oPendingProcess.getSubmissionKey());
                }
                return DispatchOutcome.ACCEPTED;
            }
            if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, "Rejecting submission {0}", oPendingProcess.getSubmissionKey());
            }
            return DispatchOutcome.REJECTED;
        }
        return DispatchOutcome.REJECTED;
    }

    protected boolean dispatchTask(PendingSubmission pendingSubmission, Task task) {
        if (task != null) {
            boolean result = false;
            if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, "Dispatching task {0}, key {1}", new Object[]{task.toString(), pendingSubmission.getSubmissionKey()});
            }
            int retries = 0;
            block0: while (!result && retries++ < 3) {
                Map<TaskProcessorMediatorKey, TaskProcessorMediator> selectedSet;
                if (logger.isLoggable(Level.FINEST)) {
                    logger.log(Level.FINEST, "Selecting TaskProcessor for id {0} - try {1}", new Object[]{pendingSubmission.getResultIdentifier(), retries});
                }
                if ((selectedSet = this.taskDispatchPolicy.selectTaskProcessorSet(task, pendingSubmission.getSubmissionConfiguration(), this.taskProcessorMediators, this.taskProcessorDefinitionList)) != null) {
                    if (logger.isLoggable(Level.FINEST)) {
                        logger.log(Level.FINEST, "Dispatching and selecting TaskProcessor from this set: {0}", selectedSet);
                    }
                    Iterator<TaskProcessorMediatorKey> iter = selectedSet.keySet().iterator();
                    while (iter.hasNext() && !result) {
                        TaskProcessorMediator existingOwnerProxy;
                        boolean offerTaskResult;
                        TaskProcessorMediatorKey taskProcessorKey = iter.next();
                        TaskProcessorMediator taskProcessorMediatorProxy = (TaskProcessorMediator)this.taskProcessorMediatorFactory.getProxy((Object)taskProcessorKey);
                        SubmissionResult submissionResult = (SubmissionResult)this.submissionResultProxyFactory.getProxy((Object)pendingSubmission.getResultIdentifier());
                        Object alreadyAssignedOwner = submissionResult.assign(taskProcessorKey);
                        if (alreadyAssignedOwner == null) {
                            result = taskProcessorMediatorProxy.offerTask(pendingSubmission.getSubmissionKey(), pendingSubmission.getResultIdentifier());
                            if (!result) {
                                this.setSubmissionToRetryStatus(pendingSubmission, taskProcessorKey, submissionResult);
                                continue;
                            }
                            if (!logger.isLoggable(Level.FINER)) continue block0;
                            logger.log(Level.FINER, "TaskProcessor {0} accepted submission {1}", new Object[]{taskProcessorKey, pendingSubmission.getSubmissionKey()});
                            continue block0;
                        }
                        if (logger.isLoggable(Level.INFO)) {
                            logger.log(Level.INFO, "Task {0} with result id {1} was already assigned.", new Object[]{pendingSubmission.getSubmissionKey(), pendingSubmission.getResultIdentifier()});
                        }
                        if (!(offerTaskResult = (existingOwnerProxy = (TaskProcessorMediator)this.taskProcessorMediatorFactory.getProxy(alreadyAssignedOwner)).offerTask(pendingSubmission.getSubmissionKey(), pendingSubmission.getResultIdentifier()))) {
                            this.setSubmissionToRetryStatus(pendingSubmission, taskProcessorKey, submissionResult);
                            continue;
                        }
                        result = true;
                        continue block0;
                    }
                    continue;
                }
                if (!logger.isLoggable(Level.SEVERE)) continue;
                logger.log(Level.SEVERE, "The TaskDispatchPolicy returned no TaskProcessorMediators");
            }
            return result;
        }
        if (logger.isLoggable(Level.WARNING)) {
            logger.log(Level.WARNING, "The Task was null, for the Id:{0}", pendingSubmission.getResultIdentifier());
        }
        return false;
    }

    private void setSubmissionToRetryStatus(PendingSubmission pendingSubmission, TaskProcessorMediatorKey taskProcessorKey, SubmissionResult submissionResult) {
        boolean retryResult = submissionResult.retry();
        if (retryResult) {
            if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, "TaskProcessor {0} rejected submission {1}", new Object[]{taskProcessorKey, pendingSubmission.getSubmissionKey()});
            }
        } else if (logger.isLoggable(Level.SEVERE)) {
            logger.log(Level.SEVERE, "TaskProcessor {0} rejected submission {1}, failed to restore owner", new Object[]{taskProcessorKey, pendingSubmission.getSubmissionKey()});
        }
    }

    private void initialize(ConfigurableCacheFactory ccf) {
        this.environment = (Environment)ccf.getResourceRegistry().getResource(Environment.class);
        this.submissionResultProxyFactory = (ObjectProxyFactory)this.environment.getResource(SubmissionResult.class);
        this.submissionProxyFactory = (ObjectProxyFactory)this.environment.getResource(Submission.class);
        this.taskProcessorMediatorFactory = (ObjectProxyFactory)this.environment.getResource(TaskProcessorMediator.class);
        this.taskProcessorMediatorCache = ccf.ensureCache("coherence.patterns.processing.taskprocessormediator", null);
        this.taskProcessorMediatorMBeanFactory = new ObjectProxyFactory("coherence.patterns.processing.taskprocessormediator", TaskProcessorMediatorProxyMBean.class);
        this.initializeTaskProcessorMediatorListener();
        this.taskProcessorDefinitionCache = ccf.ensureCache("coherence.patterns.processing.taskprocessordefinitions", null);
        this.initializeTaskProcessorDefinitionListener(ccf);
    }

    private void initializeTaskProcessorDefinitionListener(final ConfigurableCacheFactory oCCFactory) {
        this.taskProcessorDefinitionListener = new MultiplexingMapListener(){

            protected void onMapEvent(MapEvent mapEvent) {
                if (mapEvent.getId() == 1) {
                    TaskProcessorDefinition taskProcessorDefinition = (TaskProcessorDefinition)mapEvent.getNewValue();
                    try {
                        DefaultTaskDispatcher.this.taskProcessorDefinitionList.put(taskProcessorDefinition.getIdentifier(), taskProcessorDefinition);
                        if (logger.isLoggable(Level.INFO)) {
                            logger.log(Level.INFO, "Registering TaskProcessorDefinition {0}", taskProcessorDefinition.toString());
                        }
                        DefaultTaskDispatcher.this.handleTaskProcessorDefinition(oCCFactory, taskProcessorDefinition);
                    }
                    catch (Throwable e) {
                        if (logger.isLoggable(Level.SEVERE)) {
                            logger.log(Level.SEVERE, "Could not handle TaskProcessorDefinition {0} exception {1}", new Object[]{taskProcessorDefinition, e});
                        }
                    }
                } else if (mapEvent.getId() == 2) {
                    TaskProcessorDefinition oTaskProcessorDefinition = (TaskProcessorDefinition)mapEvent.getNewValue();
                    if (logger.isLoggable(Level.INFO)) {
                        logger.log(Level.INFO, "TaskProcessorDefinition {0} updated.", oTaskProcessorDefinition.toString());
                    }
                } else if (mapEvent.getId() == 3) {
                    TaskProcessorDefinition taskProcessorDefinition = (TaskProcessorDefinition)mapEvent.getOldValue();
                    DefaultTaskDispatcher.this.taskProcessorDefinitionList.remove(taskProcessorDefinition.getIdentifier());
                    if (logger.isLoggable(Level.INFO)) {
                        logger.log(Level.INFO, "Removing TaskProcessorDefinition {0}", taskProcessorDefinition.toString());
                    }
                }
            }
        };
        this.taskProcessorDefinitionCache.addMapListener(this.taskProcessorDefinitionListener);
        for (TaskProcessorDefinition taskProcessorDefinition : this.taskProcessorDefinitionCache.values()) {
            this.taskProcessorDefinitionList.put(taskProcessorDefinition.getIdentifier(), taskProcessorDefinition);
            try {
                this.handleTaskProcessorDefinition(oCCFactory, taskProcessorDefinition);
            }
            catch (Throwable e) {
                if (!logger.isLoggable(Level.SEVERE)) continue;
                logger.log(Level.SEVERE, "Could not handle TaskProcessorDefinition {0} exception {1}", new Object[]{taskProcessorDefinition, e});
            }
        }
    }

    private void initializeTaskProcessorMediatorListener() {
        this.taskProcessorMediatorListener = new MultiplexingMapListener(){

            protected void onMapEvent(MapEvent mapEvent) {
                if (mapEvent.getId() == 1) {
                    TaskProcessorMediatorKey oTaskProcessorKey = (TaskProcessorMediatorKey)mapEvent.getKey();
                    if (oTaskProcessorKey != null && logger.isLoggable(Level.INFO)) {
                        logger.log(Level.INFO, "TaskProcessorMediator inserted {0}", oTaskProcessorKey.toString());
                    }
                    if (!DefaultTaskDispatcher.this.taskProcessorMediators.containsKey(oTaskProcessorKey)) {
                        TaskProcessorMediator taskProcessorMediator = (TaskProcessorMediator)mapEvent.getNewValue();
                        if (taskProcessorMediator.getProcessorState() == TaskProcessorStateEnum.ACTIVE) {
                            DefaultTaskDispatcher.this.taskProcessorMediators.put(oTaskProcessorKey, taskProcessorMediator);
                        }
                    } else if (logger.isLoggable(Level.SEVERE)) {
                        logger.log(Level.SEVERE, "TaskProcessorMediator can't be inserted twice: {0}", oTaskProcessorKey.toString());
                    }
                } else if (mapEvent.getId() == 2) {
                    TaskProcessorMediatorKey oTaskProcessorKey = (TaskProcessorMediatorKey)mapEvent.getKey();
                    if (oTaskProcessorKey != null) {
                        TaskProcessorMediator mediator = (TaskProcessorMediator)mapEvent.getNewValue();
                        if (mediator.getProcessorState() == TaskProcessorStateEnum.ACTIVE) {
                            DefaultTaskDispatcher.this.taskProcessorMediators.put(oTaskProcessorKey, mediator);
                        } else {
                            DefaultTaskDispatcher.this.taskProcessorMediators.remove(oTaskProcessorKey);
                        }
                        if (logger.isLoggable(Level.FINER)) {
                            logger.log(Level.FINER, "TaskProcessorMediator updated: {0}", oTaskProcessorKey.toString());
                        }
                    }
                } else if (mapEvent.getId() == 3) {
                    TaskProcessorMediatorKey oTaskProcessorKey = (TaskProcessorMediatorKey)mapEvent.getKey();
                    if (logger.isLoggable(Level.INFO)) {
                        logger.log(Level.INFO, "TaskProcessorMediator deleted: {0}", oTaskProcessorKey.toString());
                    }
                    DefaultTaskDispatcher.this.taskProcessorMediators.remove(oTaskProcessorKey);
                }
            }
        };
        this.taskProcessorMediatorCache.addMapListener(this.taskProcessorMediatorListener);
        for (Map.Entry oTaskProcessorEntry : this.taskProcessorMediatorCache.entrySet()) {
            if (((TaskProcessorMediator)oTaskProcessorEntry.getValue()).getProcessorState() != TaskProcessorStateEnum.ACTIVE) continue;
            this.taskProcessorMediators.put((TaskProcessorMediatorKey)oTaskProcessorEntry.getKey(), (TaskProcessorMediator)oTaskProcessorEntry.getValue());
        }
    }

    private void handleTaskProcessorDefinition(ConfigurableCacheFactory oCCFactory, TaskProcessorDefinition oTaskProcessorDefinition) throws Throwable {
        CacheService cs;
        if (oTaskProcessorDefinition.getTaskProcessorType() == TaskProcessorType.GRID && (cs = this.taskProcessorMediatorCache.getCacheService()) instanceof DistributedCacheService && ((DistributedCacheService)cs).isLocalStorageEnabled()) {
            if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, "HandleTaskProcessorDefinition: {0}", oTaskProcessorDefinition.toString());
            }
            Member member = CacheFactory.getCluster().getLocalMember();
            TaskProcessorMediatorKey key = new TaskProcessorMediatorKey(oTaskProcessorDefinition.getIdentifier(), member.getId(), member.getUid());
            if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, "Created key for TaskProcessorMediator: {0}", key.toString());
            }
            TaskProcessorMediator tps = (TaskProcessorMediator)this.taskProcessorMediatorFactory.createRemoteObjectIfNotExists((Object)key, DefaultTaskProcessorMediator.class, new Object[]{key, oTaskProcessorDefinition.getAttributeMap()});
            tps.setAttribute("machinename", CacheFactory.ensureCluster().getLocalMember().getMachineName());
            tps.setAttribute("hostname", CacheFactory.ensureCluster().getLocalMember().getAddress().getHostName());
            tps.setAttribute("rackname", CacheFactory.ensureCluster().getLocalMember().getRackName());
            tps.setAttribute("membername", CacheFactory.ensureCluster().getLocalMember().getMemberName());
            tps.setAttribute("processname", CacheFactory.ensureCluster().getLocalMember().getProcessName());
            tps.setAttribute("rolename", CacheFactory.ensureCluster().getLocalMember().getRoleName());
            if (oTaskProcessorDefinition.getTaskProcessorType() == TaskProcessorType.GRID) {
                tps.setAttribute("type", "grid");
            } else {
                tps.setAttribute("type", "single");
            }
            TaskProcessor processor = oTaskProcessorDefinition.getTaskProcessor();
            if (processor != null) {
                this.localTaskProcessors.add(processor);
                processor.onStartup(tps, key, this.submissionProxyFactory, this.submissionResultProxyFactory, this.taskProcessorMediatorFactory, clientLeaseMaintainer);
            } else if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, "The attached processor was null");
            }
        }
    }

    @Override
    public String getDispatcherName() {
        return this.getName();
    }

    @Override
    public int getSubmissionsAccepted() {
        return this.noAcceptedSubmissions;
    }

    @Override
    public int getSubmissionsOffered() {
        return this.noOfferedSubmissions;
    }

    @Override
    public void readExternal(DataInput in) throws IOException {
        super.readExternal(in);
        this.taskDispatchPolicy = (TaskDispatchPolicy)ExternalizableHelper.readObject((DataInput)in);
    }

    @Override
    public void writeExternal(DataOutput out) throws IOException {
        super.writeExternal(out);
        ExternalizableHelper.writeObject((DataOutput)out, (Object)this.taskDispatchPolicy);
    }

    @Override
    public void readExternal(PofReader reader) throws IOException {
        super.readExternal(reader);
        this.taskDispatchPolicy = (TaskDispatchPolicy)reader.readObject(100);
    }

    @Override
    public void writeExternal(PofWriter writer) throws IOException {
        super.writeExternal(writer);
        writer.writeObject(100, (Object)this.taskDispatchPolicy);
    }

    static {
        logger = Logger.getLogger(DefaultTaskDispatcher.class.getName());
    }
}

