package com.oracle.coherence.patterns.eventdistribution.distributors;

import com.oracle.coherence.common.cluster.ClusterMetaInfo;
import com.oracle.coherence.common.cluster.LocalClusterMetaInfo;
import com.oracle.coherence.common.finitestatemachines.AnnotationDrivenModel;
import com.oracle.coherence.common.finitestatemachines.Event;
import com.oracle.coherence.common.finitestatemachines.ExecutionContext;
import com.oracle.coherence.common.finitestatemachines.Instruction;
import com.oracle.coherence.common.finitestatemachines.NonBlockingFiniteStateMachine;
import com.oracle.coherence.common.finitestatemachines.annotations.OnEnterState;
import com.oracle.coherence.common.finitestatemachines.annotations.Transition;
import com.oracle.coherence.common.finitestatemachines.annotations.Transitions;
import com.oracle.coherence.common.threading.ExecutorServiceFactory;
import com.oracle.coherence.common.threading.ThreadFactories;
import com.oracle.coherence.common.tuples.Pair;
import com.oracle.coherence.patterns.eventdistribution.EventChannel;
import com.oracle.coherence.patterns.eventdistribution.EventChannelControlled;
import com.oracle.coherence.patterns.eventdistribution.EventChannelController;
import com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean;
import com.oracle.coherence.patterns.eventdistribution.EventChannelNotReadyException;
import com.oracle.coherence.patterns.eventdistribution.EventDistributor;
import com.oracle.coherence.patterns.eventdistribution.EventIteratorTransformer;
import com.oracle.coherence.patterns.eventdistribution.EventTransformer;
import com.oracle.coherence.patterns.eventdistribution.events.DistributableEntry;
import com.oracle.coherence.patterns.eventdistribution.events.DistributableEntryEvent;
import com.oracle.coherence.patterns.eventdistribution.events.DistributableEntryPropagatedEvent;
import com.oracle.coherence.patterns.eventdistribution.transformers.MutatingEventIteratorTransformer;
import com.tangosol.coherence.config.ParameterList;
import com.tangosol.coherence.config.builder.ParameterizedBuilder;
import com.tangosol.config.expression.Parameter;
import com.tangosol.config.expression.ParameterResolver;
import com.tangosol.io.ExternalizableLite;
import com.tangosol.io.Serializer;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.net.BackingMapManagerContext;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.DistributedCacheService;
import com.tangosol.net.NamedCache;
import com.tangosol.net.partition.PartitionSet;
import com.tangosol.util.Binary;
import com.tangosol.util.BinaryEntry;
import com.tangosol.util.Converter;
import com.tangosol.util.ExternalizableHelper;
import com.tangosol.util.InvocableMap;
import com.tangosol.util.filter.PartitionedFilter;
import com.tangosol.util.filter.PresentFilter;
import com.tangosol.util.processor.AbstractProcessor;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

@Transitions({@Transition(toState = "DISTRIBUTING", fromStates = {"STARTING", "WAITING", "DELAYING"}), @Transition(toState = "WAITING", fromStates = {"DISTRIBUTING"}), @Transition(toState = "DELAYING", fromStates = {"DISTRIBUTING"}), @Transition(toState = "SUSPENDED", fromStates = {"DISTRIBUTING", "STARTING", "ERROR", "DISABLED", "DRAINING", "WAITING", "DELAYING", "PROPAGATING"}), @Transition(toState = "STARTING", fromStates = {"ERROR", "SUSPENDED", "DISABLED"}), @Transition(toState = "DRAINING", fromStates = {"SUSPENDED"}), @Transition(toState = "DISABLED", fromStates = {"SUSPENDED", "WAITING", "DELAYING", "ERROR", "PROPAGATING"}), @Transition(toState = "ERROR", fromStates = {"STARTING", "DISTRIBUTING"}), @Transition(toState = "STOPPED", fromStates = {"STARTING", "SUSPENDED", "WAITING", "DISABLED", "ERROR", "DELAYING", "PROPAGATING"}), @Transition(toState = "PROPAGATING", fromStates = {"SUSPENDED", "DISABLED"})})
/* loaded from: input_file:com/oracle/coherence/patterns/eventdistribution/distributors/AbstractEventChannelController.class */
public abstract class AbstractEventChannelController<T> implements EventChannelController, EventChannelControllerMBean {
    private static Logger logger = Logger.getLogger(AbstractEventChannelController.class.getName());
    protected ClassLoader loader;
    protected EventDistributor.Identifier distributorIdentifier;
    protected EventChannelController.Identifier controllerIdentifier;
    protected volatile EventChannelController.Dependencies controllerDependencies;
    protected EventChannel channel;
    protected EventIteratorTransformer transformer;
    protected ScheduledExecutorService executorService;
    protected String cacheName;
    protected Serializer serializer;
    protected long lastDistributionDurationMS;
    protected long minimumDistributionDurationMS;
    protected long maximumDistributionDurationMS;
    protected long totalDistributionDurationMS;
    protected int consecutiveDistributionFailures;
    protected int totalBatchesDistributed;
    protected int totalCandidateEvents;
    protected int totalEventsDistributed;
    protected NonBlockingFiniteStateMachine<State> machine;

    /* loaded from: input_file:com/oracle/coherence/patterns/eventdistribution/distributors/AbstractEventChannelController$ControllerEvent.class */
    public enum ControllerEvent implements Event<State> {
        START(State.STARTING),
        STOP(State.STOPPED),
        DISABLE(State.DISABLED),
        SUSPEND(State.SUSPENDED),
        DRAIN(State.DRAINING),
        DISTRIBUTE(State.DISTRIBUTING),
        PROPAGATE(State.PROPAGATING);

        private State desiredState;

        ControllerEvent(State state) {
            this.desiredState = state;
        }

        @Override // com.oracle.coherence.common.finitestatemachines.Event
        public State getDesiredState(State state, ExecutionContext executionContext) {
            return this.desiredState;
        }
    }

    /* loaded from: input_file:com/oracle/coherence/patterns/eventdistribution/distributors/AbstractEventChannelController$DefaultDependencies.class */
    public static class DefaultDependencies implements EventChannelController.Dependencies, PortableObject, ExternalizableLite {
        public static final EventChannelController.Mode STARTING_MODE_DEFAULT = EventChannelController.Mode.ENABLED;
        public static final long BATCH_DISTRIBUTION_DELAY_DEFAULT = 1000;
        public static final int BATCH_SIZE_DEFAULT = 100;
        public static final long EVENT_POLLING_DELAY_DEFAULT = 1000;
        public static final long RESTART_DELAY_DEFAULT = 10000;
        public static final int TOTAL_CONSECUTIVE_FAILURES_BEFORE_SUSPENDING = -1;
        private String channelName;
        private String externalName;
        private ParameterizedBuilder<EventChannel> eventChannelBuilder;
        private ParameterizedBuilder<EventIteratorTransformer> transformerBuilder;
        private EventChannelController.Mode startingMode;
        private long batchDistributionDelayMS;
        private int batchSize;
        private long restartDelay;
        private int totalConsecutiveFailuresBeforeSuspending;
        private long eventPollingDelay;

        public DefaultDependencies() {
        }

        public DefaultDependencies(String str, String str2, ParameterizedBuilder<EventChannel> parameterizedBuilder, ParameterizedBuilder<EventIteratorTransformer> parameterizedBuilder2, EventChannelController.Mode mode, long j, int i, long j2, int i2, long j3) {
            this.channelName = str;
            this.externalName = str2;
            this.eventChannelBuilder = parameterizedBuilder;
            this.transformerBuilder = parameterizedBuilder2;
            this.startingMode = mode;
            this.batchDistributionDelayMS = j;
            this.batchSize = i;
            this.restartDelay = j2;
            this.totalConsecutiveFailuresBeforeSuspending = i2;
            this.eventPollingDelay = j3;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public String getChannelName() {
            return this.channelName;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public String getExternalName() {
            return this.externalName;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public ParameterizedBuilder<EventChannel> getEventChannelBuilder() {
            return this.eventChannelBuilder;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public ParameterizedBuilder<EventIteratorTransformer> getEventsTransformerBuilder() {
            return this.transformerBuilder;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public EventChannelController.Mode getStartingMode() {
            return this.startingMode;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public void setStartingMode(EventChannelController.Mode mode) {
            this.startingMode = mode;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public long getBatchDistributionDelay() {
            return this.batchDistributionDelayMS;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public void setBatchDistributionDelay(long j) {
            this.batchDistributionDelayMS = j;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public int getBatchSize() {
            return this.batchSize;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public void setBatchSize(int i) {
            this.batchSize = i;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public long getRestartDelay() {
            return this.restartDelay;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public void setRestartDelay(long j) {
            this.restartDelay = j;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public long getEventPollingDelay() {
            return this.eventPollingDelay;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public int getTotalConsecutiveFailuresBeforeSuspending() {
            return this.totalConsecutiveFailuresBeforeSuspending;
        }

        public void readExternal(DataInput dataInput) throws IOException {
            this.channelName = ExternalizableHelper.readSafeUTF(dataInput);
            this.externalName = ExternalizableHelper.readSafeUTF(dataInput);
            this.eventChannelBuilder = (ParameterizedBuilder) ExternalizableHelper.readObject(dataInput);
            this.transformerBuilder = (ParameterizedBuilder) ExternalizableHelper.readObject(dataInput);
            this.startingMode = EventChannelController.Mode.valueOf(ExternalizableHelper.readSafeUTF(dataInput));
            this.batchDistributionDelayMS = ExternalizableHelper.readLong(dataInput);
            this.batchSize = ExternalizableHelper.readInt(dataInput);
            this.restartDelay = ExternalizableHelper.readLong(dataInput);
            this.totalConsecutiveFailuresBeforeSuspending = ExternalizableHelper.readInt(dataInput);
            this.eventPollingDelay = ExternalizableHelper.readLong(dataInput);
        }

        public void writeExternal(DataOutput dataOutput) throws IOException {
            ExternalizableHelper.writeSafeUTF(dataOutput, this.channelName);
            ExternalizableHelper.writeSafeUTF(dataOutput, this.externalName);
            ExternalizableHelper.writeObject(dataOutput, this.eventChannelBuilder);
            ExternalizableHelper.writeObject(dataOutput, this.transformerBuilder);
            ExternalizableHelper.writeSafeUTF(dataOutput, this.startingMode.name());
            ExternalizableHelper.writeLong(dataOutput, this.batchDistributionDelayMS);
            ExternalizableHelper.writeInt(dataOutput, this.batchSize);
            ExternalizableHelper.writeLong(dataOutput, this.restartDelay);
            ExternalizableHelper.writeInt(dataOutput, this.totalConsecutiveFailuresBeforeSuspending);
            ExternalizableHelper.writeLong(dataOutput, this.eventPollingDelay);
        }

        public void readExternal(PofReader pofReader) throws IOException {
            this.channelName = pofReader.readString(1);
            this.externalName = pofReader.readString(2);
            this.eventChannelBuilder = (ParameterizedBuilder) pofReader.readObject(3);
            this.transformerBuilder = (ParameterizedBuilder) pofReader.readObject(4);
            this.startingMode = EventChannelController.Mode.valueOf(pofReader.readString(5));
            this.batchDistributionDelayMS = pofReader.readLong(6);
            this.batchSize = pofReader.readInt(7);
            this.restartDelay = pofReader.readLong(8);
            this.totalConsecutiveFailuresBeforeSuspending = pofReader.readInt(9);
            this.eventPollingDelay = pofReader.readLong(10);
        }

        public void writeExternal(PofWriter pofWriter) throws IOException {
            pofWriter.writeString(1, this.channelName);
            pofWriter.writeString(2, this.externalName);
            pofWriter.writeObject(3, this.eventChannelBuilder);
            pofWriter.writeObject(4, this.transformerBuilder);
            pofWriter.writeString(5, this.startingMode.name());
            pofWriter.writeLong(6, this.batchDistributionDelayMS);
            pofWriter.writeInt(7, this.batchSize);
            pofWriter.writeLong(8, this.restartDelay);
            pofWriter.writeInt(9, this.totalConsecutiveFailuresBeforeSuspending);
            pofWriter.writeLong(10, this.eventPollingDelay);
        }

        public String toString() {
            return String.format("AbstractEventChannelController.Dependencies{channelName=%s, externalName=%s, eventChannelBuilder=%s, transformerBuilder=%s, startingMode=%s, batchDistributionDelayMS=%d, batchSize=%d, restartDelay=%d, totalConsecutiveFailuresBeforeSuspended=%d, eventPollingDelay=%d}", this.channelName, this.externalName, this.eventChannelBuilder, this.transformerBuilder, this.startingMode, Long.valueOf(this.batchDistributionDelayMS), Integer.valueOf(this.batchSize), Long.valueOf(this.restartDelay), Integer.valueOf(this.totalConsecutiveFailuresBeforeSuspending), Long.valueOf(this.eventPollingDelay));
        }
    }

    /* loaded from: input_file:com/oracle/coherence/patterns/eventdistribution/distributors/AbstractEventChannelController$RaiseControllerEventProcessor.class */
    public static class RaiseControllerEventProcessor extends AbstractProcessor implements PortableObject, ExternalizableLite {
        private ControllerEvent event;

        public RaiseControllerEventProcessor() {
        }

        public RaiseControllerEventProcessor(ControllerEvent controllerEvent) {
            this.event = controllerEvent;
        }

        public Object process(InvocableMap.Entry entry) {
            if (!entry.isPresent()) {
                return null;
            }
            Object value = entry.getValue();
            if (!(value instanceof EventChannelControlled)) {
                return null;
            }
            EventChannelControlled eventChannelControlled = (EventChannelControlled) value;
            EventChannelController eventChannelController = ((EventChannelControllerManager) ((BinaryEntry) entry).getContext().getManager().getCacheFactory().getResourceRegistry().getResource(EventChannelControllerManager.class)).getEventChannelController(eventChannelControlled.getEventDistributorIdentifier(), eventChannelControlled.getEventChannelControllerIdentifier());
            if (!(eventChannelController instanceof AbstractEventChannelController)) {
                return null;
            }
            NonBlockingFiniteStateMachine<State> nonBlockingFiniteStateMachine = ((AbstractEventChannelController) eventChannelController).machine;
            if (this.event != ControllerEvent.START) {
                nonBlockingFiniteStateMachine.process(this.event);
                return null;
            }
            if (nonBlockingFiniteStateMachine.start()) {
                return null;
            }
            nonBlockingFiniteStateMachine.process(this.event);
            return null;
        }

        public void readExternal(DataInput dataInput) throws IOException {
            this.event = (ControllerEvent) ExternalizableHelper.readObject(dataInput);
        }

        public void writeExternal(DataOutput dataOutput) throws IOException {
            ExternalizableHelper.writeObject(dataOutput, this.event);
        }

        public void readExternal(PofReader pofReader) throws IOException {
            this.event = (ControllerEvent) pofReader.readObject(100);
        }

        public void writeExternal(PofWriter pofWriter) throws IOException {
            pofWriter.writeObject(100, this.event);
        }
    }

    /* loaded from: input_file:com/oracle/coherence/patterns/eventdistribution/distributors/AbstractEventChannelController$State.class */
    public enum State {
        DISABLED,
        SUSPENDED,
        STARTING,
        DISTRIBUTING,
        WAITING,
        DELAYING,
        STOPPED,
        ERROR,
        DRAINING,
        PROPAGATING
    }

    public AbstractEventChannelController(EventDistributor.Identifier identifier, EventChannelController.Identifier identifier2, EventChannelController.Dependencies dependencies, ClassLoader classLoader, ParameterResolver parameterResolver, ParameterizedBuilder<Serializer> parameterizedBuilder) {
        this.distributorIdentifier = identifier;
        this.controllerIdentifier = identifier2;
        this.controllerDependencies = dependencies;
        this.loader = classLoader;
        this.channel = (EventChannel) dependencies.getEventChannelBuilder().realize(parameterResolver, (ClassLoader) null, (ParameterList) null);
        this.transformer = dependencies.getEventsTransformerBuilder() != null ? (EventIteratorTransformer) dependencies.getEventsTransformerBuilder().realize(parameterResolver, (ClassLoader) null, (ParameterList) null) : null;
        this.executorService = ExecutorServiceFactory.newSingleThreadScheduledExecutor(ThreadFactories.newThreadFactory(true, "EventChannelController", new ThreadGroup("EventChannelController")));
        this.serializer = (Serializer) parameterizedBuilder.realize(parameterResolver, (ClassLoader) null, (ParameterList) null);
        State state = State.STARTING;
        switch (dependencies.getStartingMode()) {
            case ENABLED:
                state = State.STARTING;
                break;
            case DISABLED:
                state = State.DISABLED;
                break;
            case SUSPENDED:
                state = State.SUSPENDED;
                break;
        }
        this.machine = new NonBlockingFiniteStateMachine<>(getEventChannelControllerName(), new AnnotationDrivenModel(State.class, this), state, this.executorService, true, false);
        this.lastDistributionDurationMS = 0L;
        this.maximumDistributionDurationMS = Long.MIN_VALUE;
        this.minimumDistributionDurationMS = Long.MAX_VALUE;
        this.totalDistributionDurationMS = 0L;
        this.consecutiveDistributionFailures = 0;
        this.totalBatchesDistributed = 0;
        this.totalCandidateEvents = 0;
        this.totalEventsDistributed = 0;
        Parameter resolve = parameterResolver.resolve("cache-name");
        this.cacheName = resolve == null ? null : (String) resolve.evaluate(parameterResolver).as(String.class);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Serializer getSerializer() {
        return this.serializer;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public String getEventDistributorName() {
        return String.format("%s (%s)", this.distributorIdentifier.getSymbolicName(), this.distributorIdentifier.getExternalName());
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public String getEventChannelControllerName() {
        return String.format("%s (%s)", this.controllerIdentifier.getSymbolicName(), this.controllerIdentifier.getExternalName());
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public String getEventChannelControllerState() {
        return this.machine.getState().toString();
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public long getLastBatchDistributionDuration() {
        return this.lastDistributionDurationMS;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public long getMaximumBatchDistributionDuration() {
        if (this.maximumDistributionDurationMS == Long.MIN_VALUE) {
            return 0L;
        }
        return this.maximumDistributionDurationMS;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public long getMinimumBatchDistributionDuration() {
        if (this.minimumDistributionDurationMS == Long.MAX_VALUE) {
            return 0L;
        }
        return this.minimumDistributionDurationMS;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public long getTotalDistributionDuration() {
        return this.totalDistributionDurationMS;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public int getConsecutiveEventChannelFailures() {
        return this.consecutiveDistributionFailures;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public int getEventBatchesDistributedCount() {
        return this.totalBatchesDistributed;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public int getCandidateEventCount() {
        return this.totalCandidateEvents;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public int getEventsDistributedCount() {
        return this.totalEventsDistributed;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public long getBatchDistributionDelay() {
        return this.controllerDependencies.getBatchDistributionDelay();
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public void setBatchDistributionDelay(long j) {
        this.controllerDependencies.setBatchDistributionDelay(j);
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public int getBatchSize() {
        return this.controllerDependencies.getBatchSize();
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public void setBatchSize(int i) {
        this.controllerDependencies.setBatchSize(i);
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public long getRestartDelay() {
        return this.controllerDependencies.getRestartDelay();
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public void setRestartDelay(long j) {
        this.controllerDependencies.setRestartDelay(j);
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController
    public EventChannelController.Mode getStartingMode() {
        return this.controllerDependencies.getStartingMode();
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController
    public void setStartingMode(EventChannelController.Mode mode) {
        this.controllerDependencies.setStartingMode(mode);
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public String getInitialState() {
        EventChannelController.Mode startingMode = getStartingMode();
        return startingMode == EventChannelController.Mode.ENABLED ? "AUTO-START" : startingMode.toString();
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController
    public void prepare() {
        this.machine.start();
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public void start() {
        if (this.machine.start()) {
            return;
        }
        this.machine.process(ControllerEvent.START);
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController
    public void stop() {
        this.machine.process(ControllerEvent.STOP);
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController
    public void preempt() {
        this.machine.process(new NonBlockingFiniteStateMachine.CoalescedEvent(ControllerEvent.DISTRIBUTE));
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public void disable() {
        this.machine.process(ControllerEvent.DISABLE);
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public void suspend() {
        this.machine.process(ControllerEvent.SUSPEND);
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public void drain() {
        this.machine.process(ControllerEvent.DRAIN);
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public void propagate() {
        this.machine.process(ControllerEvent.PROPAGATE);
    }

    protected abstract Pair<List<com.oracle.coherence.common.events.Event>, T> getEventsToDistribute();

    protected abstract void acknowledgeDistributedEvents(List<com.oracle.coherence.common.events.Event> list, T t);

    protected abstract void internalDrain();

    protected abstract void internalEnable();

    protected abstract void internalDisable();

    protected abstract void internalStart() throws EventChannelNotReadyException;

    protected abstract void internalStop();

    @OnEnterState("STARTING")
    public Instruction onStarting(State state, State state2, Event<State> event, ExecutionContext executionContext) {
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Starting EventChannelController {0}", this.controllerIdentifier);
        }
        try {
            if (state == State.DISABLED) {
                internalEnable();
            }
            internalStart();
            this.consecutiveDistributionFailures = 0;
            setStartingMode(EventChannelController.Mode.ENABLED);
            if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, "Scheduled EventChannelController {0} to commence event distribution.", this.controllerIdentifier);
            }
            return new Instruction.TransitionTo(State.DISTRIBUTING);
        } catch (EventChannelNotReadyException e) {
            if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, "The {0} is not ready to start.  Deferring (re)start for {1} ms", new Object[]{this.controllerIdentifier, Long.valueOf(this.controllerDependencies.getRestartDelay())});
            }
            return new Instruction.TransitionTo(State.ERROR);
        } catch (RuntimeException e2) {
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, "Failed while attempting to start {0}", this.controllerIdentifier);
            }
            if (logger.isLoggable(Level.INFO)) {
                logger.log(Level.INFO, "EventChannel Exception was as follows", (Throwable) e2);
            }
            this.consecutiveDistributionFailures++;
            if (this.controllerDependencies.getTotalConsecutiveFailuresBeforeSuspending() < 0 || this.consecutiveDistributionFailures < this.controllerDependencies.getTotalConsecutiveFailuresBeforeSuspending()) {
                if (logger.isLoggable(Level.FINER)) {
                    logger.log(Level.FINER, "Scheduled another attempt to start EventChannelController {0}.", this.controllerIdentifier);
                }
                return new Instruction.TransitionTo(State.ERROR);
            }
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, "Suspending {0} as there have been too many (%d) consecutive failures ", this.controllerIdentifier);
            }
            return new Instruction.TransitionTo(State.SUSPENDED);
        }
    }

    @OnEnterState("STOPPED")
    public Instruction onStopped(State state, State state2, Event<State> event, ExecutionContext executionContext) {
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Stopping EventChannelController {0}", this.controllerIdentifier);
        }
        try {
            internalStop();
        } catch (RuntimeException e) {
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, "Failed while attempting to stop {0}", this.controllerIdentifier);
            }
            if (logger.isLoggable(Level.INFO)) {
                logger.log(Level.INFO, "EventChannel Exception was as follows", (Throwable) e);
            }
        }
        return Instruction.STOP;
    }

    @OnEnterState("DISABLED")
    public Instruction onDisabled(State state, State state2, Event<State> event, ExecutionContext executionContext) {
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Disabling EventChannelController {0}", this.controllerIdentifier);
        }
        try {
            internalDisable();
            setStartingMode(EventChannelController.Mode.DISABLED);
        } catch (RuntimeException e) {
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, "Failed while attempting to disable EventChannelController {0}", this.controllerIdentifier);
            }
        }
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Disabled EventChannelController {0}", this.controllerIdentifier);
        }
        return Instruction.NOTHING;
    }

    @OnEnterState("SUSPENDED")
    public Instruction onSuspended(State state, State state2, Event<State> event, ExecutionContext executionContext) {
        setStartingMode(EventChannelController.Mode.SUSPENDED);
        if (state == State.DISABLED) {
            try {
                internalEnable();
            } catch (RuntimeException e) {
                if (logger.isLoggable(Level.WARNING)) {
                    logger.log(Level.WARNING, "Failed to enable EventChannelController {0}, moving back to disabled state", this.controllerIdentifier);
                }
                return new Instruction.TransitionTo(State.DISABLED);
            }
        }
        return Instruction.NOTHING;
    }

    @OnEnterState("ERROR")
    public Instruction onError(State state, State state2, Event<State> event, ExecutionContext executionContext) {
        return new NonBlockingFiniteStateMachine.ProcessEventLater(new NonBlockingFiniteStateMachine.SubsequentEvent(ControllerEvent.START), this.controllerDependencies.getRestartDelay(), TimeUnit.MILLISECONDS);
    }

    @OnEnterState("WAITING")
    public Instruction onWaiting(State state, State state2, Event<State> event, ExecutionContext executionContext) {
        return new NonBlockingFiniteStateMachine.ProcessEventLater(new NonBlockingFiniteStateMachine.SubsequentEvent(ControllerEvent.DISTRIBUTE), this.controllerDependencies.getEventPollingDelay(), TimeUnit.MILLISECONDS);
    }

    @OnEnterState("DELAYING")
    public Instruction onDelaying(State state, State state2, Event<State> event, ExecutionContext executionContext) {
        return new NonBlockingFiniteStateMachine.ProcessEventLater(new NonBlockingFiniteStateMachine.SubsequentEvent(ControllerEvent.DISTRIBUTE), this.controllerDependencies.getBatchDistributionDelay(), TimeUnit.MILLISECONDS);
    }

    @OnEnterState("DRAINING")
    public Instruction onDraining(State state, State state2, Event<State> event, ExecutionContext executionContext) {
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Draining EventChannelController {0}", this.controllerIdentifier);
        }
        try {
            internalDrain();
        } catch (RuntimeException e) {
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, "Failed to drain the waiting events for EventChannelController {0} due to {1}. Suspending EventChannelController", new Object[]{this.controllerDependencies.getChannelName(), e.getCause()});
                logger.log(Level.WARNING, "EventChannel Exception was as follows", (Throwable) e);
            }
        }
        return new Instruction.TransitionTo(State.SUSPENDED);
    }

    @OnEnterState("PROPAGATING")
    public Instruction onPropagating(State state, State state2, Event<State> event, ExecutionContext executionContext) {
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Commencing propagation of entries using the EventChannelController {0}", this.controllerIdentifier);
        }
        try {
            if (state == State.DISABLED) {
                internalEnable();
            }
            internalStart();
            NamedCache cache = CacheFactory.getCache(this.cacheName);
            DistributedCacheService cacheService = cache.getCacheService();
            BackingMapManagerContext context = cacheService.getBackingMapManager().getContext();
            Converter keyToInternalConverter = context.getKeyToInternalConverter();
            Converter valueToInternalConverter = context.getValueToInternalConverter();
            ClusterMetaInfo localClusterMetaInfo = LocalClusterMetaInfo.getInstance();
            HashMap hashMap = new HashMap();
            hashMap.put(DistributableEntry.CLUSTER_META_INFO_DECORATION_KEY, localClusterMetaInfo);
            int batchSize = this.controllerDependencies.getBatchSize() < 0 ? 100 : this.controllerDependencies.getBatchSize();
            ArrayList arrayList = new ArrayList(batchSize);
            int partitionCount = cacheService.getPartitionCount();
            if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, "Approximately {0} entries to propagate contained with in {1} partitions (using a batch size of {2})", new Object[]{Integer.valueOf(cache.size()), Integer.valueOf(partitionCount), Integer.valueOf(batchSize)});
            }
            for (int i = 0; i < partitionCount && executionContext.isAcceptingEvents(); i++) {
                PartitionSet partitionSet = new PartitionSet(partitionCount);
                partitionSet.add(i);
                Set entrySet = cache.entrySet(new PartitionedFilter(PresentFilter.INSTANCE, partitionSet));
                if (logger.isLoggable(Level.FINER)) {
                    logger.log(Level.FINER, "Propagating {0} entries contained in partition {1}", new Object[]{Integer.valueOf(entrySet.size()), Integer.valueOf(i)});
                }
                Iterator it = entrySet.iterator();
                while (it.hasNext() && executionContext.isAcceptingEvents()) {
                    Map.Entry entry = (Map.Entry) it.next();
                    DistributableEntryPropagatedEvent distributableEntryPropagatedEvent = new DistributableEntryPropagatedEvent(this.cacheName, new DistributableEntry((Binary) keyToInternalConverter.convert(entry.getKey()), (Binary) context.addInternalValueDecoration((Binary) valueToInternalConverter.convert(entry.getValue()), 7, hashMap), null, context));
                    if (arrayList.size() == batchSize) {
                        this.channel.send(this.transformer == null ? arrayList.iterator() : this.transformer.transform(arrayList.iterator()));
                        this.totalBatchesDistributed++;
                        this.totalCandidateEvents += arrayList.size();
                        this.totalEventsDistributed += arrayList.size();
                        if (executionContext.isAcceptingEvents() && this.controllerDependencies.getBatchDistributionDelay() > 0) {
                            Thread.sleep(this.controllerDependencies.getBatchDistributionDelay());
                        }
                        arrayList.clear();
                    }
                    arrayList.add(distributableEntryPropagatedEvent);
                }
            }
            if (arrayList.size() > 0) {
                this.channel.send(this.transformer == null ? arrayList.iterator() : this.transformer.transform(arrayList.iterator()));
                this.totalBatchesDistributed++;
                this.totalCandidateEvents += arrayList.size();
                this.totalEventsDistributed += arrayList.size();
            }
        } catch (EventChannelNotReadyException e) {
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, "Failed to start propagating entries with the EventChannelController {0} due to {1}. Reverting to the {2} state.", new Object[]{this.controllerDependencies.getChannelName(), e.getCause(), state});
                logger.log(Level.WARNING, "EventChannel Exception was as follows", (Throwable) e);
            }
        } catch (InterruptedException e2) {
            logger.log(Level.WARNING, "Interrupted while propagating entries with the EventChannelController {0}. Reverting to the {1} state.", new Object[]{this.controllerDependencies.getChannelName(), state});
        } catch (RuntimeException e3) {
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, "Failed to propagate entries with the EventChannelController {0} due to {1}. Reverting to the {2} state.", new Object[]{this.controllerDependencies.getChannelName(), e3.getCause(), state});
                logger.log(Level.WARNING, "EventChannel Exception was as follows", (Throwable) e3);
            }
        }
        if (state == State.DISABLED) {
            internalDisable();
        }
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Completed propagation of entries using the EventChannelController {0}", this.controllerIdentifier);
        }
        return new Instruction.TransitionTo(state);
    }

    @OnEnterState("DISTRIBUTING")
    public Instruction onDistributing(State state, State state2, Event<State> event, ExecutionContext executionContext) {
        Instruction.TransitionTo transitionTo;
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Commenced distribution with EventChannel {0}", this.controllerIdentifier);
        }
        try {
            Pair<List<com.oracle.coherence.common.events.Event>, T> eventsToDistribute = getEventsToDistribute();
            List<com.oracle.coherence.common.events.Event> x = eventsToDistribute.getX();
            T y = eventsToDistribute.getY();
            if (x.isEmpty()) {
                transitionTo = new Instruction.TransitionTo(State.WAITING);
            } else {
                int size = x.size();
                if (logger.isLoggable(Level.FINER)) {
                    logger.log(Level.FINER, "Commencing distributing {1} event(s) with EventChannel {0}", new Object[]{this.controllerIdentifier, Integer.valueOf(x.size())});
                }
                long currentTimeMillis = System.currentTimeMillis();
                Iterator<com.oracle.coherence.common.events.Event> transform = new MutatingEventIteratorTransformer(new EventTransformer() { // from class: com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController.1
                    @Override // com.oracle.coherence.common.util.Transformer
                    public com.oracle.coherence.common.events.Event transform(com.oracle.coherence.common.events.Event event2) {
                        if (!(event2 instanceof DistributableEntryEvent)) {
                            return event2;
                        }
                        DistributableEntryEvent distributableEntryEvent = (DistributableEntryEvent) event2;
                        distributableEntryEvent.getEntry().setContext(CacheFactory.getCache(distributableEntryEvent.getCacheName()).getCacheService().getBackingMapManager().getContext());
                        return distributableEntryEvent;
                    }
                }).transform(x.iterator());
                int send = this.channel.send(this.transformer == null ? transform : (Iterator) this.transformer.transform(transform));
                this.totalBatchesDistributed++;
                this.totalCandidateEvents += size;
                this.totalEventsDistributed += send;
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                this.lastDistributionDurationMS = currentTimeMillis2;
                this.maximumDistributionDurationMS = Math.max(this.maximumDistributionDurationMS, currentTimeMillis2);
                this.minimumDistributionDurationMS = Math.min(this.minimumDistributionDurationMS, currentTimeMillis2);
                this.totalDistributionDurationMS += currentTimeMillis2;
                if (logger.isLoggable(Level.FINER)) {
                    logger.log(Level.FINER, "Completed distributing {1} Event(s) with EventChannel {0}", new Object[]{this.controllerIdentifier, Integer.valueOf(x.size())});
                }
                acknowledgeDistributedEvents(x, y);
                this.consecutiveDistributionFailures = 0;
                transitionTo = new Instruction.TransitionTo(State.DELAYING);
            }
        } catch (RuntimeException e) {
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, "EventChannelController {0} failed to distribute events", this.controllerIdentifier);
                logger.log(Level.INFO, "Exception was as follows:", (Throwable) e);
            }
            this.consecutiveDistributionFailures++;
            if (this.controllerDependencies.getTotalConsecutiveFailuresBeforeSuspending() < 0 || this.consecutiveDistributionFailures < this.controllerDependencies.getTotalConsecutiveFailuresBeforeSuspending()) {
                transitionTo = new Instruction.TransitionTo(State.ERROR);
            } else {
                if (logger.isLoggable(Level.WARNING)) {
                    logger.log(Level.WARNING, "Suspending distribution for {0} as there have been too many ({1}) consecutive failures ", new Object[]{this.controllerIdentifier, Integer.valueOf(this.consecutiveDistributionFailures)});
                }
                transitionTo = new Instruction.TransitionTo(State.SUSPENDED);
            }
        }
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Completed distribution with {0}.  Next Instruction {1}", new Object[]{this.controllerIdentifier, transitionTo});
        }
        return transitionTo;
    }
}
