package com.oracle.coherence.patterns.processing.internal;

import com.oracle.coherence.common.util.ObjectProxyFactory;
import com.oracle.coherence.patterns.processing.DispatcherFilter;
import com.oracle.coherence.patterns.processing.SubmissionState;
import com.oracle.coherence.patterns.processing.dispatchers.DispatchController;
import com.oracle.coherence.patterns.processing.dispatchers.DispatchOutcome;
import com.oracle.coherence.patterns.processing.dispatchers.Dispatcher;
import com.oracle.coherence.patterns.processing.dispatchers.PendingSubmission;
import com.oracle.coherence.patterns.processing.exceptions.NoDispatcherForSubmissionException;
import com.tangosol.net.ConfigurableCacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener;
import com.tangosol.util.MultiplexingMapListener;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.DelayQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/oracle/coherence/patterns/processing/internal/DefaultDispatchController.class */
public class DefaultDispatchController implements DispatchController {
    private static Logger logger = Logger.getLogger(DefaultDispatchController.class.getName());
    private final TreeMap<Integer, Dispatcher> mapDispatcherList;
    private DelayQueue<PendingSubmission> queuePendingSubmissions;
    private Thread dispatcherThread;
    private ConfigurableCacheFactory ccFactory;
    MapListener dispatcherCacheListener;
    NamedCache dispatcherCache;
    private ObjectProxyFactory<SubmissionResult> submissionResultProxyFactory;

    public DefaultDispatchController(ConfigurableCacheFactory configurableCacheFactory, ObjectProxyFactory<SubmissionResult> objectProxyFactory) {
        DefaultSubmission.setDispatchController(this);
        this.ccFactory = configurableCacheFactory;
        this.submissionResultProxyFactory = objectProxyFactory;
        this.mapDispatcherList = new TreeMap<>();
        this.queuePendingSubmissions = new DelayQueue<>();
    }

    public void onDependenciesSatisfied(Environment environment) {
        this.dispatcherCache = this.ccFactory.ensureCache(DefaultDispatcherManager.CACHENAME, (ClassLoader) null);
        this.dispatcherThread = new Thread(this, "DefaultDispatchController.dispatcherThread");
        this.dispatcherThread.start();
        if (logger.isLoggable(Level.INFO)) {
            logger.log(Level.INFO, "Starting Default Dispatch Controller");
        }
    }

    @Override // com.oracle.coherence.patterns.processing.dispatchers.DispatchController
    public void accept(PendingSubmission pendingSubmission) {
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Accepting submission {0} resultid {1}", new Object[]{pendingSubmission.getSubmissionKey(), pendingSubmission.getResultIdentifier()});
        }
        if (this.queuePendingSubmissions.offer((DelayQueue<PendingSubmission>) pendingSubmission) || !logger.isLoggable(Level.SEVERE)) {
            return;
        }
        logger.log(Level.SEVERE, "Default Dispatch Controller did not accept {0}", pendingSubmission.toString());
    }

    @Override // com.oracle.coherence.patterns.processing.dispatchers.DispatchController
    public void acceptTransferredSubmission(DefaultPendingSubmission defaultPendingSubmission) {
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Accepting TRANSFERRED submission {0} resultid {1}", new Object[]{defaultPendingSubmission.getSubmissionKey(), defaultPendingSubmission.getResultIdentifier()});
        }
        if (this.queuePendingSubmissions.offer((DelayQueue<PendingSubmission>) defaultPendingSubmission) || !logger.isLoggable(Level.SEVERE)) {
            return;
        }
        logger.log(Level.SEVERE, "Default Dispatch Controller did not accept {0}", defaultPendingSubmission.toString());
    }

    @Override // com.oracle.coherence.patterns.processing.dispatchers.DispatchController
    public void discard(PendingSubmission pendingSubmission) {
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Discarding pending Submission {0}", pendingSubmission);
        }
        this.queuePendingSubmissions.remove(pendingSubmission);
    }

    @Override // com.oracle.coherence.patterns.processing.dispatchers.DispatchController
    public void onDispatcherUpdate(Dispatcher dispatcher) {
    }

    @Override // com.oracle.coherence.patterns.processing.dispatchers.DispatchController
    public ConfigurableCacheFactory getConfigurableCacheFactory() {
        return this.ccFactory;
    }

    @Override // java.lang.Runnable
    public void run() {
        synchronized (this.mapDispatcherList) {
            for (Map.Entry entry : this.dispatcherCache.entrySet()) {
                Dispatcher dispatcher = (Dispatcher) entry.getValue();
                this.mapDispatcherList.put((Integer) entry.getKey(), dispatcher);
                dispatcher.onStartup(this);
            }
        }
        initializeDispatcherCacheListener(this.dispatcherCache);
        while (true) {
            try {
                dispatchSubmission((DefaultPendingSubmission) this.queuePendingSubmissions.take());
            } catch (InterruptedException e) {
                if (logger.isLoggable(Level.FINER)) {
                    logger.log(Level.FINER, "Dispatcher thread interrupted.");
                    return;
                }
                return;
            } catch (Throwable th) {
                if (logger.isLoggable(Level.SEVERE)) {
                    logger.log(Level.SEVERE, "Dispatcher thread caught exception {0}", th);
                }
            }
        }
    }

    private void initializeDispatcherCacheListener(NamedCache namedCache) {
        MultiplexingMapListener multiplexingMapListener = new MultiplexingMapListener() { // from class: com.oracle.coherence.patterns.processing.internal.DefaultDispatchController.1
            protected void onMapEvent(MapEvent mapEvent) {
                if (mapEvent.getId() == 1) {
                    Dispatcher dispatcher = (Dispatcher) mapEvent.getNewValue();
                    if (DefaultDispatchController.logger.isLoggable(Level.INFO)) {
                        DefaultDispatchController.logger.log(Level.INFO, "Starting dispatcher:", dispatcher.getName());
                    }
                    synchronized (DefaultDispatchController.this.mapDispatcherList) {
                        DefaultDispatchController.this.mapDispatcherList.put((Integer) mapEvent.getKey(), dispatcher);
                    }
                    dispatcher.onStartup(DefaultDispatchController.this);
                    return;
                }
                if (mapEvent.getId() == 2) {
                    Dispatcher dispatcher2 = (Dispatcher) mapEvent.getNewValue();
                    if (DefaultDispatchController.logger.isLoggable(Level.WARNING)) {
                        DefaultDispatchController.logger.log(Level.WARNING, "Dispatchers are immutable and can't be updated", dispatcher2.getName());
                        return;
                    }
                    return;
                }
                if (mapEvent.getId() == 3) {
                    synchronized (DefaultDispatchController.this.mapDispatcherList) {
                        Dispatcher dispatcher3 = (Dispatcher) mapEvent.getOldValue();
                        if (dispatcher3 != null) {
                            if (DefaultDispatchController.logger.isLoggable(Level.INFO)) {
                                DefaultDispatchController.logger.log(Level.INFO, "Shutting down dispatcher {0}", dispatcher3.getName());
                            }
                            dispatcher3.onShutdown(DefaultDispatchController.this);
                            DefaultDispatchController.this.mapDispatcherList.remove(mapEvent.getKey());
                        }
                    }
                }
            }
        };
        this.dispatcherCacheListener = multiplexingMapListener;
        namedCache.addMapListener(multiplexingMapListener);
    }

    @Override // com.oracle.coherence.patterns.processing.dispatchers.DispatchController
    public SubmissionState getSubmissionState(Object obj) {
        return this.submissionResultProxyFactory.getProxy(obj).getSubmissionState();
    }

    protected void dispatchSubmission(DefaultPendingSubmission defaultPendingSubmission) {
        try {
            Object payload = defaultPendingSubmission.getPayload();
            if (payload != null) {
                if (logger.isLoggable(Level.FINER)) {
                    logger.log(Level.FINER, "Dispatching submission resultid {1}, submission key {0} payload {2}", new Object[]{defaultPendingSubmission.getSubmissionKey(), defaultPendingSubmission.getResultIdentifier(), payload});
                }
                DispatchOutcome dispatchOutcome = DispatchOutcome.REJECTED;
                synchronized (this.mapDispatcherList) {
                    Iterator<Dispatcher> it = this.mapDispatcherList.values().iterator();
                    while (it.hasNext()) {
                        dispatchOutcome = tryDispatchSubmissionToDispatcher(defaultPendingSubmission, it.next());
                        if (dispatchOutcome instanceof DispatchOutcome.Accepted) {
                            break;
                        } else if (dispatchOutcome != DispatchOutcome.CONTINUE && (dispatchOutcome instanceof DispatchOutcome.RetryLater)) {
                            defaultPendingSubmission.setDelay(((DispatchOutcome.RetryLater) dispatchOutcome).getDelay());
                        }
                    }
                }
                if (dispatchOutcome instanceof DispatchOutcome.RetryLater) {
                    if (dispatchOutcome != DispatchOutcome.CONTINUE) {
                        this.queuePendingSubmissions.add((DelayQueue<PendingSubmission>) defaultPendingSubmission);
                        if (logger.isLoggable(Level.FINER)) {
                            logger.log(Level.FINER, "Retrying submission {0}", defaultPendingSubmission.toString());
                        }
                    }
                } else if (dispatchOutcome instanceof DispatchOutcome.Rejected) {
                    if (logger.isLoggable(Level.FINER)) {
                        logger.log(Level.FINER, "Submission was REJECTED by every dispatcher {0}", defaultPendingSubmission.toString());
                    }
                    SubmissionResult proxy = this.submissionResultProxyFactory.getProxy(defaultPendingSubmission.getResultIdentifier());
                    if (proxy != null) {
                        proxy.processingFailed(new NoDispatcherForSubmissionException());
                    } else if (logger.isLoggable(Level.SEVERE)) {
                        logger.log(Level.SEVERE, "Failed to set the failure result for result with ID:{0}", defaultPendingSubmission.getResultIdentifier());
                    }
                }
            } else if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, "The submission with ID {0} had a NULL payload - resultid {1}", new Object[]{defaultPendingSubmission.getSubmissionKey(), defaultPendingSubmission.getResultIdentifier()});
            }
        } catch (NoSuchElementException e) {
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, "The submission with ID {0} didn't exist - resultid {1}", new Object[]{defaultPendingSubmission.getSubmissionKey(), defaultPendingSubmission.getResultIdentifier()});
            }
        }
    }

    protected DispatchOutcome tryDispatchSubmissionToDispatcher(DefaultPendingSubmission defaultPendingSubmission, Dispatcher dispatcher) {
        DispatchOutcome dispatchOutcome = DispatchOutcome.REJECTED;
        if (defaultPendingSubmission.getSubmissionConfiguration() != null) {
            DispatcherFilter dispatcherFilter = defaultPendingSubmission.getSubmissionConfiguration().getDispatcherFilter();
            if (dispatcherFilter == null) {
                dispatchOutcome = dispatcher.dispatch(defaultPendingSubmission);
            } else if (dispatcherFilter.filterDispatcher(dispatcher)) {
                dispatchOutcome = dispatcher.dispatch(defaultPendingSubmission);
            }
        } else {
            dispatchOutcome = dispatcher.dispatch(defaultPendingSubmission);
        }
        return dispatchOutcome;
    }
}
