/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.patterns.processing.internal;

import com.oracle.coherence.common.util.ObjectProxyFactory;
import com.oracle.coherence.patterns.processing.DispatcherFilter;
import com.oracle.coherence.patterns.processing.SubmissionState;
import com.oracle.coherence.patterns.processing.dispatchers.DispatchController;
import com.oracle.coherence.patterns.processing.dispatchers.DispatchOutcome;
import com.oracle.coherence.patterns.processing.dispatchers.Dispatcher;
import com.oracle.coherence.patterns.processing.dispatchers.PendingSubmission;
import com.oracle.coherence.patterns.processing.exceptions.NoDispatcherForSubmissionException;
import com.oracle.coherence.patterns.processing.internal.DefaultPendingSubmission;
import com.oracle.coherence.patterns.processing.internal.DefaultSubmission;
import com.oracle.coherence.patterns.processing.internal.Environment;
import com.oracle.coherence.patterns.processing.internal.SubmissionResult;
import com.tangosol.net.ConfigurableCacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener;
import com.tangosol.util.MultiplexingMapListener;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.DelayQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class DefaultDispatchController
implements DispatchController {
    private static Logger logger = Logger.getLogger(DefaultDispatchController.class.getName());
    private final TreeMap<Integer, Dispatcher> mapDispatcherList;
    private DelayQueue<PendingSubmission> queuePendingSubmissions;
    private Thread dispatcherThread;
    private ConfigurableCacheFactory ccFactory;
    MapListener dispatcherCacheListener;
    NamedCache dispatcherCache;
    private ObjectProxyFactory<SubmissionResult> submissionResultProxyFactory;

    public DefaultDispatchController(ConfigurableCacheFactory ccFactory, ObjectProxyFactory<SubmissionResult> submissionResultProxyFactory) {
        DefaultSubmission.setDispatchController(this);
        this.ccFactory = ccFactory;
        this.submissionResultProxyFactory = submissionResultProxyFactory;
        this.mapDispatcherList = new TreeMap();
        this.queuePendingSubmissions = new DelayQueue();
    }

    public void onDependenciesSatisfied(Environment environment) {
        this.dispatcherCache = this.ccFactory.ensureCache("coherence.patterns.processing.dispatchers", null);
        this.dispatcherThread = new Thread((Runnable)this, "DefaultDispatchController.dispatcherThread");
        this.dispatcherThread.start();
        if (logger.isLoggable(Level.INFO)) {
            logger.log(Level.INFO, "Starting Default Dispatch Controller");
        }
    }

    @Override
    public void accept(PendingSubmission oPendingSubmission) {
        boolean result;
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Accepting submission {0} resultid {1}", new Object[]{oPendingSubmission.getSubmissionKey(), oPendingSubmission.getResultIdentifier()});
        }
        if (!(result = this.queuePendingSubmissions.offer(oPendingSubmission)) && logger.isLoggable(Level.SEVERE)) {
            logger.log(Level.SEVERE, "Default Dispatch Controller did not accept {0}", oPendingSubmission.toString());
        }
    }

    @Override
    public void acceptTransferredSubmission(DefaultPendingSubmission defaultPendingSubmission) {
        boolean result;
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Accepting TRANSFERRED submission {0} resultid {1}", new Object[]{defaultPendingSubmission.getSubmissionKey(), defaultPendingSubmission.getResultIdentifier()});
        }
        if (!(result = this.queuePendingSubmissions.offer(defaultPendingSubmission)) && logger.isLoggable(Level.SEVERE)) {
            logger.log(Level.SEVERE, "Default Dispatch Controller did not accept {0}", defaultPendingSubmission.toString());
        }
    }

    @Override
    public void discard(PendingSubmission oProcess) {
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Discarding pending Submission {0}", oProcess);
        }
        this.queuePendingSubmissions.remove(oProcess);
    }

    @Override
    public void onDispatcherUpdate(Dispatcher dispatcher) {
    }

    @Override
    public ConfigurableCacheFactory getConfigurableCacheFactory() {
        return this.ccFactory;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        TreeMap<Integer, Dispatcher> treeMap = this.mapDispatcherList;
        synchronized (treeMap) {
            for (Map.Entry entry : this.dispatcherCache.entrySet()) {
                Dispatcher dispatcher = (Dispatcher)entry.getValue();
                this.mapDispatcherList.put((Integer)entry.getKey(), dispatcher);
                dispatcher.onStartup(this);
            }
        }
        this.initializeDispatcherCacheListener(this.dispatcherCache);
        while (true) {
            try {
                while (true) {
                    DefaultPendingSubmission oPendingSubmission = (DefaultPendingSubmission)this.queuePendingSubmissions.take();
                    this.dispatchSubmission(oPendingSubmission);
                }
            }
            catch (InterruptedException oException) {
                if (!logger.isLoggable(Level.FINER)) break;
                logger.log(Level.FINER, "Dispatcher thread interrupted.");
            }
            catch (Throwable t) {
                if (!logger.isLoggable(Level.SEVERE)) continue;
                logger.log(Level.SEVERE, "Dispatcher thread caught exception {0}", t);
                continue;
            }
            break;
        }
    }

    private void initializeDispatcherCacheListener(NamedCache dispatcherCache) {
        this.dispatcherCacheListener = new MultiplexingMapListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            protected void onMapEvent(MapEvent mapEvent) {
                if (mapEvent.getId() == 1) {
                    Dispatcher dispatcher = (Dispatcher)mapEvent.getNewValue();
                    if (logger.isLoggable(Level.INFO)) {
                        logger.log(Level.INFO, "Starting dispatcher:", dispatcher.getName());
                    }
                    TreeMap treeMap = DefaultDispatchController.this.mapDispatcherList;
                    synchronized (treeMap) {
                        DefaultDispatchController.this.mapDispatcherList.put((Integer)mapEvent.getKey(), dispatcher);
                    }
                    dispatcher.onStartup(DefaultDispatchController.this);
                } else if (mapEvent.getId() == 2) {
                    Dispatcher dispatcher = (Dispatcher)mapEvent.getNewValue();
                    if (logger.isLoggable(Level.WARNING)) {
                        logger.log(Level.WARNING, "Dispatchers are immutable and can't be updated", dispatcher.getName());
                    }
                } else if (mapEvent.getId() == 3) {
                    TreeMap treeMap = DefaultDispatchController.this.mapDispatcherList;
                    synchronized (treeMap) {
                        Dispatcher dispatcher = (Dispatcher)mapEvent.getOldValue();
                        if (dispatcher != null) {
                            if (logger.isLoggable(Level.INFO)) {
                                logger.log(Level.INFO, "Shutting down dispatcher {0}", dispatcher.getName());
                            }
                            dispatcher.onShutdown(DefaultDispatchController.this);
                            DefaultDispatchController.this.mapDispatcherList.remove(mapEvent.getKey());
                        }
                    }
                }
            }
        };
        dispatcherCache.addMapListener(this.dispatcherCacheListener);
    }

    @Override
    public SubmissionState getSubmissionState(Object resultId) {
        SubmissionResult submissionResult = (SubmissionResult)this.submissionResultProxyFactory.getProxy(resultId);
        return submissionResult.getSubmissionState();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void dispatchSubmission(DefaultPendingSubmission oPendingSubmission) {
        block18: {
            try {
                Object payLoad = oPendingSubmission.getPayload();
                if (payLoad != null) {
                    if (logger.isLoggable(Level.FINER)) {
                        logger.log(Level.FINER, "Dispatching submission resultid {1}, submission key {0} payload {2}", new Object[]{oPendingSubmission.getSubmissionKey(), oPendingSubmission.getResultIdentifier(), payLoad});
                    }
                    DispatchOutcome oResult = DispatchOutcome.REJECTED;
                    boolean fFirst = true;
                    TreeMap<Integer, Dispatcher> treeMap = this.mapDispatcherList;
                    synchronized (treeMap) {
                        Dispatcher dispatcher;
                        Iterator<Dispatcher> i$ = this.mapDispatcherList.values().iterator();
                        while (i$.hasNext() && !((oResult = this.tryDispatchSubmissionToDispatcher(oPendingSubmission, dispatcher = i$.next())) instanceof DispatchOutcome.Accepted)) {
                            if (oResult == DispatchOutcome.CONTINUE || !(oResult instanceof DispatchOutcome.RetryLater)) continue;
                            DispatchOutcome.RetryLater laterResult = (DispatchOutcome.RetryLater)oResult;
                            long delay = laterResult.getDelay();
                            oPendingSubmission.setDelay(delay);
                        }
                    }
                    if (oResult instanceof DispatchOutcome.RetryLater) {
                        if (oResult != DispatchOutcome.CONTINUE) {
                            this.queuePendingSubmissions.add(oPendingSubmission);
                            if (logger.isLoggable(Level.FINER)) {
                                logger.log(Level.FINER, "Retrying submission {0}", oPendingSubmission.toString());
                            }
                        }
                    } else if (oResult instanceof DispatchOutcome.Rejected) {
                        SubmissionResult submissionResult;
                        if (logger.isLoggable(Level.FINER)) {
                            logger.log(Level.FINER, "Submission was REJECTED by every dispatcher {0}", oPendingSubmission.toString());
                        }
                        if ((submissionResult = (SubmissionResult)this.submissionResultProxyFactory.getProxy((Object)oPendingSubmission.getResultIdentifier())) != null) {
                            submissionResult.processingFailed(new NoDispatcherForSubmissionException());
                        } else if (logger.isLoggable(Level.SEVERE)) {
                            logger.log(Level.SEVERE, "Failed to set the failure result for result with ID:{0}", oPendingSubmission.getResultIdentifier());
                        }
                    }
                    break block18;
                }
                if (logger.isLoggable(Level.WARNING)) {
                    logger.log(Level.WARNING, "The submission with ID {0} had a NULL payload - resultid {1}", new Object[]{oPendingSubmission.getSubmissionKey(), oPendingSubmission.getResultIdentifier()});
                }
            }
            catch (NoSuchElementException e) {
                if (!logger.isLoggable(Level.WARNING)) break block18;
                logger.log(Level.WARNING, "The submission with ID {0} didn't exist - resultid {1}", new Object[]{oPendingSubmission.getSubmissionKey(), oPendingSubmission.getResultIdentifier()});
            }
        }
    }

    protected DispatchOutcome tryDispatchSubmissionToDispatcher(DefaultPendingSubmission oPendingSubmission, Dispatcher dispatcher) {
        DispatchOutcome oResult = DispatchOutcome.REJECTED;
        if (oPendingSubmission.getSubmissionConfiguration() != null) {
            DispatcherFilter filter = oPendingSubmission.getSubmissionConfiguration().getDispatcherFilter();
            if (filter != null) {
                if (filter.filterDispatcher(dispatcher)) {
                    oResult = dispatcher.dispatch(oPendingSubmission);
                }
            } else {
                oResult = dispatcher.dispatch(oPendingSubmission);
            }
        } else {
            oResult = dispatcher.dispatch(oPendingSubmission);
        }
        return oResult;
    }
}

