package org.apache.helix.controller;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
import org.apache.helix.api.listeners.ClusterConfigChangeListener;
import org.apache.helix.api.listeners.ControllerChangeListener;
import org.apache.helix.api.listeners.CurrentStateChangeListener;
import org.apache.helix.api.listeners.IdealStateChangeListener;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.api.listeners.MessageListener;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.api.listeners.ResourceConfigChangeListener;
import org.apache.helix.common.ClusterEventBlockingQueue;
import org.apache.helix.common.DedupEventProcessor;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.pipeline.AsyncWorkerType;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.PipelineRegistry;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.controller.stages.CompatibilityCheckStage;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.ExternalViewComputeStage;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
import org.apache.helix.controller.stages.MaintenanceRecoveryStage;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageThrottleStage;
import org.apache.helix.controller.stages.PersistAssignmentStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.controller.stages.ResourceValidationStage;
import org.apache.helix.controller.stages.TargetExteralViewCalcStage;
import org.apache.helix.controller.stages.TaskGarbageCollectionStage;
import org.apache.helix.controller.stages.TopStateHandoffReportStage;
import org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.controller.stages.task.TaskMessageDispatchStage;
import org.apache.helix.controller.stages.task.TaskMessageGenerationPhase;
import org.apache.helix.controller.stages.task.TaskPersistDataStage;
import org.apache.helix.controller.stages.task.TaskSchedulingStage;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.MaintenanceSignal;
import org.apache.helix.model.Message;
import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.monitoring.mbeans.ClusterEventMonitor;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.com.google.common.collect.Sets;

/* loaded from: input_file:org/apache/helix/controller/GenericHelixController.class */
public class GenericHelixController implements IdealStateChangeListener, LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener, ControllerChangeListener, InstanceConfigChangeListener, ResourceConfigChangeListener, ClusterConfigChangeListener {
    private static final long EVENT_THREAD_JOIN_TIMEOUT = 1000;
    private static final int ASYNC_TASKS_THREADPOOL_SIZE = 10;
    private final PipelineRegistry _registry;
    private final PipelineRegistry _taskRegistry;
    final AtomicReference<Map<String, LiveInstance>> _lastSeenInstances;
    final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions;
    private boolean _isMonitoring;
    private final ClusterStatusMonitor _clusterStatusMonitor;
    private final ClusterEventBlockingQueue _eventQueue;
    private final ClusterEventProcessor _eventThread;
    private final ClusterEventBlockingQueue _taskEventQueue;
    private final ClusterEventProcessor _taskEventThread;
    private final Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> _asyncFIFOWorkerPool;
    private long _continousRebalanceFailureCount;
    private boolean _paused;
    private boolean _inMaintenanceMode;
    Timer _periodicalRebalanceTimer;
    long _timerPeriod;
    Timer _onDemandRebalanceTimer;
    AtomicReference<RebalanceTask> _nextRebalanceTask;
    private final ResourceControllerDataProvider _resourceControlDataProvider;
    private final WorkflowControllerDataProvider _workflowControlDataProvider;
    private final ScheduledExecutorService _asyncTasksThreadPool;
    private long _lastPipelineEndTimestamp;
    private String _clusterName;
    private final Set<Pipeline.Type> _enabledPipelineTypes;
    private HelixManager _helixManager;
    private static final Logger logger = LoggerFactory.getLogger(GenericHelixController.class.getName());
    private static Map<String, GenericHelixController> HelixControllerFactory = new ConcurrentHashMap();

    @Deprecated
    /* loaded from: input_file:org/apache/helix/controller/GenericHelixController$ClusterEventProcessor.class */
    private class ClusterEventProcessor extends Thread {
        private final BaseControllerDataProvider _cache;
        private final ClusterEventBlockingQueue _eventBlockingQueue;
        private final String _processorName;

        ClusterEventProcessor(BaseControllerDataProvider baseControllerDataProvider, ClusterEventBlockingQueue clusterEventBlockingQueue, String str) {
            this._cache = baseControllerDataProvider;
            this._eventBlockingQueue = clusterEventBlockingQueue;
            this._processorName = str;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            GenericHelixController.logger.info("START ClusterEventProcessor thread  for cluster " + GenericHelixController.this._clusterName + ", processor name: " + this._processorName);
            while (!isInterrupted()) {
                try {
                    ClusterEvent take = this._eventBlockingQueue.take();
                    setName(String.format("HelixController-pipeline-%s-(%s)", this._processorName, take.getEventId()));
                    GenericHelixController.this.handleEvent(take, this._cache);
                } catch (InterruptedException e) {
                    GenericHelixController.logger.warn("ClusterEventProcessor interrupted " + this._processorName, (Throwable) e);
                    interrupt();
                } catch (ThreadDeath e2) {
                    GenericHelixController.logger.error("ClusterEventProcessor caught a ThreadDeath  " + this._processorName, (Throwable) e2);
                    throw e2;
                } catch (ZkInterruptedException e3) {
                    GenericHelixController.logger.warn("ClusterEventProcessor caught a ZK connection interrupt " + this._processorName, (Throwable) e3);
                    interrupt();
                } catch (Throwable th) {
                    GenericHelixController.logger.error("ClusterEventProcessor failed while running the controller pipeline " + this._processorName, th);
                }
            }
            GenericHelixController.logger.info("END ClusterEventProcessor thread " + this._processorName);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/helix/controller/GenericHelixController$RebalanceTask.class */
    public class RebalanceTask extends TimerTask {
        final HelixManager _manager;
        final ClusterEventType _clusterEventType;
        private long _nextRebalanceTime;

        public RebalanceTask(GenericHelixController genericHelixController, HelixManager helixManager, ClusterEventType clusterEventType) {
            this(helixManager, clusterEventType, -1L);
        }

        public RebalanceTask(HelixManager helixManager, ClusterEventType clusterEventType, long j) {
            this._manager = helixManager;
            this._clusterEventType = clusterEventType;
            this._nextRebalanceTime = j;
        }

        public long getNextRebalanceTime() {
            return this._nextRebalanceTime;
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            try {
                if (this._clusterEventType.equals(ClusterEventType.PeriodicalRebalance) || this._clusterEventType.equals(ClusterEventType.OnDemandRebalance)) {
                    GenericHelixController.this.requestDataProvidersFullRefresh();
                    HelixDataAccessor helixDataAccessor = this._manager.getHelixDataAccessor();
                    Map childValuesMap = helixDataAccessor.getChildValuesMap(helixDataAccessor.keyBuilder().liveInstances());
                    if (childValuesMap != null && !childValuesMap.isEmpty()) {
                        NotificationContext notificationContext = new NotificationContext(this._manager);
                        notificationContext.setType(NotificationContext.Type.CALLBACK);
                        synchronized (this._manager) {
                            GenericHelixController.this.checkLiveInstancesObservation(new ArrayList(childValuesMap.values()), notificationContext);
                        }
                    }
                }
                GenericHelixController.this.forceRebalance(this._manager, this._clusterEventType);
            } catch (Throwable th) {
                GenericHelixController.logger.error("Time task failed. Rebalance task type: " + this._clusterEventType + ", cluster: " + GenericHelixController.this._clusterName, th);
            }
        }
    }

    public static GenericHelixController getController(String str) {
        return HelixControllerFactory.get(str);
    }

    public GenericHelixController() {
        this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()), createTaskRegistry(Pipeline.Type.TASK.name()));
    }

    public GenericHelixController(String str) {
        this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()), createTaskRegistry(Pipeline.Type.TASK.name()), str, Sets.newHashSet(Pipeline.Type.TASK, Pipeline.Type.DEFAULT));
    }

    public GenericHelixController(String str, Set<Pipeline.Type> set) {
        this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()), createTaskRegistry(Pipeline.Type.TASK.name()), str, set);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void forceRebalance(HelixManager helixManager, ClusterEventType clusterEventType) {
        NotificationContext notificationContext = new NotificationContext(helixManager);
        notificationContext.setType(NotificationContext.Type.CALLBACK);
        String substring = UUID.randomUUID().toString().substring(0, 8);
        ClusterEvent clusterEvent = new ClusterEvent(this._clusterName, clusterEventType, substring);
        clusterEvent.addAttribute(AttributeName.helixmanager.name(), notificationContext.getManager());
        clusterEvent.addAttribute(AttributeName.changeContext.name(), notificationContext);
        clusterEvent.addAttribute(AttributeName.eventData.name(), new ArrayList());
        clusterEvent.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), this._asyncFIFOWorkerPool);
        enqueueEvent(this._taskEventQueue, clusterEvent);
        enqueueEvent(this._eventQueue, clusterEvent.clone(substring));
        logger.info(String.format("Controller rebalance pipeline triggered with event type: %s for cluster %s", clusterEventType, this._clusterName));
    }

    void startPeriodRebalance(long j, HelixManager helixManager) {
        if (j == this._timerPeriod) {
            logger.info("Controller already has periodical rebalance timer at period " + this._timerPeriod);
            return;
        }
        logger.info("Controller starting periodical rebalance timer at period " + j);
        if (this._periodicalRebalanceTimer != null) {
            this._periodicalRebalanceTimer.cancel();
        }
        this._periodicalRebalanceTimer = new Timer(true);
        this._timerPeriod = j;
        this._periodicalRebalanceTimer.scheduleAtFixedRate(new RebalanceTask(this, helixManager, ClusterEventType.PeriodicalRebalance), this._timerPeriod, this._timerPeriod);
    }

    void stopPeriodRebalance() {
        logger.info("Controller stopping periodical rebalance timer at period " + this._timerPeriod);
        if (this._periodicalRebalanceTimer != null) {
            this._periodicalRebalanceTimer.cancel();
            this._periodicalRebalanceTimer = null;
            this._timerPeriod = Long.MAX_VALUE;
            logger.info("Controller stopped periodical rebalance timer at period " + this._timerPeriod);
        }
    }

    @Deprecated
    public void scheduleRebalance(long j) {
        if (this._helixManager == null) {
            logger.warn("Failed to schedule a future pipeline run for cluster " + this._clusterName + " helix manager is null!");
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        long j2 = j - currentTimeMillis;
        if (j > currentTimeMillis) {
            RebalanceTask rebalanceTask = this._nextRebalanceTask.get();
            if (rebalanceTask == null || rebalanceTask.getNextRebalanceTime() <= currentTimeMillis || rebalanceTask.getNextRebalanceTime() >= j) {
                RebalanceTask rebalanceTask2 = new RebalanceTask(this._helixManager, ClusterEventType.OnDemandRebalance, j);
                this._onDemandRebalanceTimer.schedule(rebalanceTask2, j2);
                logger.info("Scheduled a future pipeline run for cluster " + this._helixManager.getClusterName() + " in delay " + j2);
                RebalanceTask andSet = this._nextRebalanceTask.getAndSet(rebalanceTask2);
                if (andSet != null) {
                    andSet.cancel();
                }
            }
        }
    }

    public void scheduleOnDemandRebalance(long j) {
        RebalanceTask rebalanceTask;
        if (this._helixManager == null) {
            logger.error("Failed to schedule a future pipeline run for cluster {}. Helix manager is null!", this._clusterName);
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        long j2 = currentTimeMillis + j;
        if (j <= 0 || (rebalanceTask = this._nextRebalanceTask.get()) == null || rebalanceTask.getNextRebalanceTime() <= currentTimeMillis || rebalanceTask.getNextRebalanceTime() >= j2) {
            RebalanceTask rebalanceTask2 = new RebalanceTask(this._helixManager, ClusterEventType.OnDemandRebalance, j2);
            this._onDemandRebalanceTimer.schedule(rebalanceTask2, j);
            logger.info("Scheduled instant pipeline run for cluster {}.", this._helixManager.getClusterName());
            RebalanceTask andSet = this._nextRebalanceTask.getAndSet(rebalanceTask2);
            if (andSet != null) {
                andSet.cancel();
            }
        }
    }

    private static PipelineRegistry createDefaultRegistry(String str) {
        PipelineRegistry pipelineRegistry;
        logger.info("createDefaultRegistry");
        synchronized (GenericHelixController.class) {
            pipelineRegistry = new PipelineRegistry();
            Pipeline pipeline = new Pipeline(str);
            pipeline.addStage(new ReadClusterDataStage());
            Pipeline pipeline2 = new Pipeline(str);
            pipeline2.addStage(new ResourceComputationStage());
            pipeline2.addStage(new ResourceValidationStage());
            pipeline2.addStage(new CurrentStateComputationStage());
            pipeline2.addStage(new TopStateHandoffReportStage());
            Pipeline pipeline3 = new Pipeline(str);
            pipeline3.addStage(new BestPossibleStateCalcStage());
            pipeline3.addStage(new IntermediateStateCalcStage());
            pipeline3.addStage(new MaintenanceRecoveryStage());
            pipeline3.addStage(new ResourceMessageGenerationPhase());
            pipeline3.addStage(new MessageSelectionStage());
            pipeline3.addStage(new MessageThrottleStage());
            pipeline3.addStage(new ResourceMessageDispatchStage());
            pipeline3.addStage(new PersistAssignmentStage());
            pipeline3.addStage(new TargetExteralViewCalcStage());
            Pipeline pipeline4 = new Pipeline(str);
            pipeline4.addStage(new ExternalViewComputeStage());
            Pipeline pipeline5 = new Pipeline(str);
            pipeline5.addStage(new CompatibilityCheckStage());
            Pipeline pipeline6 = new Pipeline(str);
            pipeline6.addStage(new MaintenanceRecoveryStage());
            pipelineRegistry.register(ClusterEventType.IdealStateChange, pipeline, pipeline2, pipeline3);
            pipelineRegistry.register(ClusterEventType.CurrentStateChange, pipeline, pipeline2, pipeline4, pipeline3);
            pipelineRegistry.register(ClusterEventType.InstanceConfigChange, pipeline, pipeline2, pipeline3);
            pipelineRegistry.register(ClusterEventType.ResourceConfigChange, pipeline, pipeline2, pipeline3);
            pipelineRegistry.register(ClusterEventType.ClusterConfigChange, pipeline, pipeline6, pipeline2, pipeline3);
            pipelineRegistry.register(ClusterEventType.LiveInstanceChange, pipeline, pipeline6, pipeline5, pipeline2, pipeline4, pipeline3);
            pipelineRegistry.register(ClusterEventType.MessageChange, pipeline, pipeline2, pipeline3);
            pipelineRegistry.register(ClusterEventType.Resume, pipeline, pipeline2, pipeline4, pipeline3);
            pipelineRegistry.register(ClusterEventType.PeriodicalRebalance, pipeline, pipeline6, pipeline2, pipeline4, pipeline3);
            pipelineRegistry.register(ClusterEventType.OnDemandRebalance, pipeline, pipeline6, pipeline2, pipeline4, pipeline3);
        }
        return pipelineRegistry;
    }

    private static PipelineRegistry createTaskRegistry(String str) {
        PipelineRegistry pipelineRegistry;
        logger.info("createTaskRegistry");
        synchronized (GenericHelixController.class) {
            pipelineRegistry = new PipelineRegistry();
            Pipeline pipeline = new Pipeline(str);
            pipeline.addStage(new ReadClusterDataStage());
            Pipeline pipeline2 = new Pipeline(str);
            pipeline2.addStage(new ResourceComputationStage());
            pipeline2.addStage(new ResourceValidationStage());
            pipeline2.addStage(new CurrentStateComputationStage());
            Pipeline pipeline3 = new Pipeline(str);
            pipeline3.addStage(new TaskSchedulingStage());
            pipeline3.addStage(new TaskPersistDataStage());
            pipeline3.addStage(new TaskGarbageCollectionStage());
            pipeline3.addStage(new TaskMessageGenerationPhase());
            pipeline3.addStage(new TaskMessageDispatchStage());
            Pipeline pipeline4 = new Pipeline(str);
            pipeline4.addStage(new CompatibilityCheckStage());
            pipelineRegistry.register(ClusterEventType.IdealStateChange, pipeline, pipeline2, pipeline3);
            pipelineRegistry.register(ClusterEventType.CurrentStateChange, pipeline, pipeline2, pipeline3);
            pipelineRegistry.register(ClusterEventType.InstanceConfigChange, pipeline, pipeline2, pipeline3);
            pipelineRegistry.register(ClusterEventType.ResourceConfigChange, pipeline, pipeline2, pipeline3);
            pipelineRegistry.register(ClusterEventType.ClusterConfigChange, pipeline, pipeline2, pipeline3);
            pipelineRegistry.register(ClusterEventType.LiveInstanceChange, pipeline, pipeline4, pipeline2, pipeline3);
            pipelineRegistry.register(ClusterEventType.MessageChange, pipeline, pipeline2, pipeline3);
            pipelineRegistry.register(ClusterEventType.Resume, pipeline, pipeline2, pipeline3);
            pipelineRegistry.register(ClusterEventType.PeriodicalRebalance, pipeline, pipeline2, pipeline3);
            pipelineRegistry.register(ClusterEventType.OnDemandRebalance, pipeline, pipeline2, pipeline3);
        }
        return pipelineRegistry;
    }

    public GenericHelixController(PipelineRegistry pipelineRegistry, PipelineRegistry pipelineRegistry2) {
        this(pipelineRegistry, pipelineRegistry2, null, Sets.newHashSet(Pipeline.Type.TASK, Pipeline.Type.DEFAULT));
    }

    private GenericHelixController(PipelineRegistry pipelineRegistry, PipelineRegistry pipelineRegistry2, String str, Set<Pipeline.Type> set) {
        this._isMonitoring = false;
        this._continousRebalanceFailureCount = 0L;
        this._periodicalRebalanceTimer = null;
        this._timerPeriod = Long.MAX_VALUE;
        this._onDemandRebalanceTimer = null;
        this._nextRebalanceTask = new AtomicReference<>();
        this._paused = false;
        this._enabledPipelineTypes = set;
        this._registry = pipelineRegistry;
        this._taskRegistry = pipelineRegistry2;
        this._lastSeenInstances = new AtomicReference<>();
        this._lastSeenSessions = new AtomicReference<>();
        this._clusterName = str;
        this._lastPipelineEndTimestamp = -1L;
        this._clusterStatusMonitor = new ClusterStatusMonitor(this._clusterName);
        this._asyncTasksThreadPool = Executors.newScheduledThreadPool(10, new ThreadFactory() { // from class: org.apache.helix.controller.GenericHelixController.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, "HelixController-async_tasks-" + GenericHelixController.this._clusterName);
            }
        });
        this._asyncFIFOWorkerPool = new HashMap();
        initializeAsyncFIFOWorkers();
        this._onDemandRebalanceTimer = new Timer(true);
        if (this._enabledPipelineTypes.contains(Pipeline.Type.DEFAULT)) {
            logger.info("Initializing {} pipeline", Pipeline.Type.DEFAULT.name());
            this._resourceControlDataProvider = new ResourceControllerDataProvider(str);
            this._eventQueue = new ClusterEventBlockingQueue();
            this._eventThread = new ClusterEventProcessor(this._resourceControlDataProvider, this._eventQueue, "default-" + str);
            initPipeline(this._eventThread, this._resourceControlDataProvider);
            logger.info("Initialized {} pipeline", Pipeline.Type.DEFAULT.name());
        } else {
            this._eventQueue = null;
            this._resourceControlDataProvider = null;
            this._eventThread = null;
        }
        if (this._enabledPipelineTypes.contains(Pipeline.Type.TASK)) {
            logger.info("Initializing {} pipeline", Pipeline.Type.TASK.name());
            this._workflowControlDataProvider = new WorkflowControllerDataProvider(str);
            this._taskEventQueue = new ClusterEventBlockingQueue();
            this._taskEventThread = new ClusterEventProcessor(this._workflowControlDataProvider, this._taskEventQueue, "task-" + str);
            initPipeline(this._taskEventThread, this._workflowControlDataProvider);
            logger.info("Initialized {} pipeline", Pipeline.Type.TASK.name());
        } else {
            this._workflowControlDataProvider = null;
            this._taskEventQueue = null;
            this._taskEventThread = null;
        }
        if (str != null) {
            HelixControllerFactory.put(str, this);
        }
    }

    private void initializeAsyncFIFOWorkers() {
        for (AsyncWorkerType asyncWorkerType : AsyncWorkerType.values()) {
            DedupEventProcessor<String, Runnable> dedupEventProcessor = new DedupEventProcessor<String, Runnable>(this._clusterName, asyncWorkerType.name()) { // from class: org.apache.helix.controller.GenericHelixController.2
                /* JADX INFO: Access modifiers changed from: protected */
                @Override // org.apache.helix.common.DedupEventProcessor
                public void handleEvent(Runnable runnable) {
                    runnable.run();
                }
            };
            dedupEventProcessor.start();
            this._asyncFIFOWorkerPool.put(asyncWorkerType, dedupEventProcessor);
            logger.info("Started async worker {}", dedupEventProcessor.getName());
        }
    }

    private void shutdownAsyncFIFOWorkers() {
        for (DedupEventProcessor<String, Runnable> dedupEventProcessor : this._asyncFIFOWorkerPool.values()) {
            dedupEventProcessor.shutdown();
            logger.info("Shutdown async worker {}", dedupEventProcessor.getName());
        }
    }

    private boolean isEventQueueEmpty(boolean z) {
        return z ? this._taskEventQueue == null || this._taskEventQueue.isEmpty() : this._eventQueue == null || this._eventQueue.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleEvent(ClusterEvent clusterEvent, BaseControllerDataProvider baseControllerDataProvider) {
        List<Pipeline> pipelinesForEvent;
        HelixManager helixManager = (HelixManager) clusterEvent.getAttribute(AttributeName.helixmanager.name());
        if (helixManager == null) {
            logger.error("No cluster manager in event:" + clusterEvent.getEventType());
            return;
        }
        if (!helixManager.isLeader()) {
            logger.error("Cluster manager: " + helixManager.getInstanceName() + " is not leader for " + helixManager.getClusterName() + ". Pipeline will not be invoked");
            return;
        }
        this._helixManager = helixManager;
        if (this._paused) {
            logger.info("Cluster " + helixManager.getClusterName() + " is paused. Ignoring the event:" + clusterEvent.getEventType());
            return;
        }
        NotificationContext notificationContext = clusterEvent.getAttribute(AttributeName.changeContext.name()) != null ? (NotificationContext) clusterEvent.getAttribute(AttributeName.changeContext.name()) : null;
        if (notificationContext != null) {
            if (notificationContext.getType() == NotificationContext.Type.FINALIZE) {
                stopPeriodRebalance();
                logger.info("Get FINALIZE notification, skip the pipeline. Event :" + clusterEvent.getEventType());
                return;
            } else {
                if (this._resourceControlDataProvider != null) {
                    checkRebalancingTimer(helixManager, Collections.emptyList(), baseControllerDataProvider.getClusterConfig());
                }
                if (this._isMonitoring) {
                    clusterEvent.addAttribute(AttributeName.clusterStatusMonitor.name(), this._clusterStatusMonitor);
                }
            }
        }
        baseControllerDataProvider.setClusterEventId(clusterEvent.getEventId());
        clusterEvent.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(), Long.valueOf(this._lastPipelineEndTimestamp));
        boolean z = false;
        if (baseControllerDataProvider instanceof ResourceControllerDataProvider) {
            pipelinesForEvent = this._registry.getPipelinesForEvent(clusterEvent.getEventType());
        } else if (!(baseControllerDataProvider instanceof WorkflowControllerDataProvider)) {
            logger.warn(String.format("No %s pipeline to run for event: %s::%s", baseControllerDataProvider.getPipelineName(), clusterEvent.getEventType(), clusterEvent.getEventId()));
            return;
        } else {
            pipelinesForEvent = this._taskRegistry.getPipelinesForEvent(clusterEvent.getEventType());
            z = true;
        }
        clusterEvent.addAttribute(AttributeName.ControllerDataProvider.name(), baseControllerDataProvider);
        logger.info(String.format("START: Invoking %s controller pipeline for cluster %s event: %s  %s", helixManager.getClusterName(), baseControllerDataProvider.getPipelineName(), clusterEvent.getEventType(), clusterEvent.getEventId()));
        long currentTimeMillis = System.currentTimeMillis();
        boolean z2 = false;
        for (Pipeline pipeline : pipelinesForEvent) {
            clusterEvent.addAttribute(AttributeName.PipelineType.name(), pipeline.getPipelineType());
            try {
                pipeline.handle(clusterEvent);
                pipeline.finish();
            } catch (Exception e) {
                logger.error("Exception while executing {} pipeline: {} for cluster {}. Will not continue to next pipeline", baseControllerDataProvider.getPipelineName(), this._clusterName, Arrays.toString(e.getStackTrace()));
                if (e instanceof HelixMetaDataAccessException) {
                    z2 = true;
                    baseControllerDataProvider.requireFullRefresh();
                    logger.warn("Rebalance pipeline failed due to read failure from zookeeper, cluster: " + this._clusterName);
                    if (isEventQueueEmpty(z)) {
                        this._continousRebalanceFailureCount++;
                        long retryDelay = getRetryDelay(this._continousRebalanceFailureCount);
                        if (retryDelay == 0) {
                            forceRebalance(helixManager, ClusterEventType.RetryRebalance);
                        } else {
                            this._asyncTasksThreadPool.schedule(new RebalanceTask(this, helixManager, ClusterEventType.RetryRebalance), retryDelay, TimeUnit.MILLISECONDS);
                        }
                        logger.info("Retry rebalance pipeline with delay " + retryDelay + "ms for cluster: " + this._clusterName);
                    }
                }
                this._clusterStatusMonitor.reportRebalanceFailure();
            }
        }
        if (!z2) {
            this._continousRebalanceFailureCount = 0L;
        }
        this._lastPipelineEndTimestamp = System.currentTimeMillis();
        logger.info("END: Invoking {} controller pipeline for event {}::{} for cluster {}, took {} ms", baseControllerDataProvider.getPipelineName(), clusterEvent.getEventType(), clusterEvent.getEventId(), this._clusterName, Long.valueOf(this._lastPipelineEndTimestamp - currentTimeMillis));
        if (!z) {
            NotificationContext notificationContext2 = (NotificationContext) clusterEvent.getAttribute(AttributeName.changeContext.name());
            long creationTime = clusterEvent.getCreationTime();
            StringBuilder sb = new StringBuilder();
            if (notificationContext2 != null) {
                long creationTime2 = notificationContext2.getCreationTime();
                if (this._isMonitoring) {
                    this._clusterStatusMonitor.updateClusterEventDuration(ClusterEventMonitor.PhaseName.Callback.name(), creationTime - creationTime2);
                }
                sb.append(String.format("Callback time for event: %s took: %s ms\n", clusterEvent.getEventType(), Long.valueOf(creationTime - creationTime2)));
            }
            if (this._isMonitoring) {
                this._clusterStatusMonitor.updateClusterEventDuration(ClusterEventMonitor.PhaseName.InQueue.name(), currentTimeMillis - creationTime);
                this._clusterStatusMonitor.updateClusterEventDuration(ClusterEventMonitor.PhaseName.TotalProcessed.name(), this._lastPipelineEndTimestamp - currentTimeMillis);
            }
            sb.append(String.format("InQueue time for event: %s took: %s ms\n", clusterEvent.getEventType(), Long.valueOf(currentTimeMillis - creationTime)));
            sb.append(String.format("TotalProcessed time for event: %s took: %s ms", clusterEvent.getEventType(), Long.valueOf(this._lastPipelineEndTimestamp - currentTimeMillis)));
            logger.info(sb.toString());
        }
        resetClusterStatusMonitor();
    }

    private long getRetryDelay(long j) {
        if (j <= 5) {
            return 0L;
        }
        return Math.min((long) (Math.pow(2.0d, j - 5) * 10.0d), 1000L);
    }

    @Override // org.apache.helix.api.listeners.CurrentStateChangeListener
    @PreFetch(enabled = false)
    public void onStateChange(String str, List<CurrentState> list, NotificationContext notificationContext) {
        logger.info("START: GenericClusterController.onStateChange()");
        notifyCaches(notificationContext, HelixConstants.ChangeType.CURRENT_STATE);
        pushToEventQueues(ClusterEventType.CurrentStateChange, notificationContext, Collections.singletonMap(AttributeName.instanceName.name(), str));
        logger.info("END: GenericClusterController.onStateChange()");
    }

    @Override // org.apache.helix.api.listeners.MessageListener
    @PreFetch(enabled = false)
    public void onMessage(String str, List<Message> list, NotificationContext notificationContext) {
        logger.info("START: GenericClusterController.onMessage() for cluster " + this._clusterName);
        notifyCaches(notificationContext, HelixConstants.ChangeType.MESSAGE);
        pushToEventQueues(ClusterEventType.MessageChange, notificationContext, Collections.singletonMap(AttributeName.instanceName.name(), str));
        if (this._isMonitoring && list != null) {
            this._clusterStatusMonitor.addMessageQueueSize(str, list.size());
        }
        logger.info("END: GenericClusterController.onMessage() for cluster " + this._clusterName);
    }

    @Override // org.apache.helix.api.listeners.LiveInstanceChangeListener
    public void onLiveInstanceChange(List<LiveInstance> list, NotificationContext notificationContext) {
        logger.info("START: Generic GenericClusterController.onLiveInstanceChange() for cluster " + this._clusterName);
        notifyCaches(notificationContext, HelixConstants.ChangeType.LIVE_INSTANCE);
        if (list == null) {
            list = Collections.emptyList();
        }
        if (notificationContext.getType() == NotificationContext.Type.INIT || notificationContext.getType() == NotificationContext.Type.CALLBACK) {
            checkLiveInstancesObservation(list, notificationContext);
        } else if (notificationContext.getType() == NotificationContext.Type.FINALIZE) {
            logger.info("remove message/current-state listeners. lastSeenInstances: " + this._lastSeenInstances + ", lastSeenSessions: " + this._lastSeenSessions);
            list = Collections.emptyList();
            checkLiveInstancesObservation(list, notificationContext);
        }
        pushToEventQueues(ClusterEventType.LiveInstanceChange, notificationContext, Collections.singletonMap(AttributeName.eventData.name(), list));
        logger.info("END: Generic GenericClusterController.onLiveInstanceChange() for cluster " + this._clusterName);
    }

    private void checkRebalancingTimer(HelixManager helixManager, List<IdealState> list, ClusterConfig clusterConfig) {
        if (helixManager.getConfigAccessor() == null) {
            logger.warn(helixManager.getInstanceName() + " config accessor doesn't exist. should be in file-based mode.");
            return;
        }
        long j = Long.MAX_VALUE;
        if (clusterConfig != null) {
            long rebalanceTimePeriod = clusterConfig.getRebalanceTimePeriod();
            if (rebalanceTimePeriod > 0 && Long.MAX_VALUE > rebalanceTimePeriod) {
                j = rebalanceTimePeriod;
            }
        }
        Iterator<IdealState> it2 = list.iterator();
        while (it2.hasNext()) {
            long rebalanceTimerPeriod = it2.next().getRebalanceTimerPeriod();
            if (rebalanceTimerPeriod > 0 && j > rebalanceTimerPeriod) {
                j = rebalanceTimerPeriod;
            }
        }
        if (j != Long.MAX_VALUE) {
            startPeriodRebalance(j, helixManager);
        } else {
            stopPeriodRebalance();
        }
    }

    @Override // org.apache.helix.api.listeners.IdealStateChangeListener
    @PreFetch(enabled = false)
    public void onIdealStateChange(List<IdealState> list, NotificationContext notificationContext) {
        logger.info("START: Generic GenericClusterController.onIdealStateChange() for cluster " + this._clusterName);
        notifyCaches(notificationContext, HelixConstants.ChangeType.IDEAL_STATE);
        pushToEventQueues(ClusterEventType.IdealStateChange, notificationContext, Collections.emptyMap());
        if (notificationContext.getType() != NotificationContext.Type.FINALIZE && notificationContext.getManager() != null) {
            HelixDataAccessor helixDataAccessor = notificationContext.getManager().getHelixDataAccessor();
            checkRebalancingTimer(notificationContext.getManager(), list, (ClusterConfig) helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().clusterConfig()));
        }
        logger.info("END: GenericClusterController.onIdealStateChange() for cluster " + this._clusterName);
    }

    @Override // org.apache.helix.api.listeners.InstanceConfigChangeListener
    @PreFetch(enabled = false)
    public void onInstanceConfigChange(List<InstanceConfig> list, NotificationContext notificationContext) {
        logger.info("START: GenericClusterController.onInstanceConfigChange() for cluster " + this._clusterName);
        notifyCaches(notificationContext, HelixConstants.ChangeType.INSTANCE_CONFIG);
        pushToEventQueues(ClusterEventType.InstanceConfigChange, notificationContext, Collections.emptyMap());
        logger.info("END: GenericClusterController.onInstanceConfigChange() for cluster " + this._clusterName);
    }

    @Override // org.apache.helix.api.listeners.ResourceConfigChangeListener
    @PreFetch(enabled = false)
    public void onResourceConfigChange(List<ResourceConfig> list, NotificationContext notificationContext) {
        logger.info("START: GenericClusterController.onResourceConfigChange() for cluster " + this._clusterName);
        notifyCaches(notificationContext, HelixConstants.ChangeType.RESOURCE_CONFIG);
        pushToEventQueues(ClusterEventType.ResourceConfigChange, notificationContext, Collections.emptyMap());
        logger.info("END: GenericClusterController.onResourceConfigChange() for cluster " + this._clusterName);
    }

    @Override // org.apache.helix.api.listeners.ClusterConfigChangeListener
    @PreFetch(enabled = false)
    public void onClusterConfigChange(ClusterConfig clusterConfig, NotificationContext notificationContext) {
        logger.info("START: GenericClusterController.onClusterConfigChange() for cluster " + this._clusterName);
        notifyCaches(notificationContext, HelixConstants.ChangeType.CLUSTER_CONFIG);
        pushToEventQueues(ClusterEventType.ClusterConfigChange, notificationContext, Collections.emptyMap());
        logger.info("END: GenericClusterController.onClusterConfigChange() for cluster " + this._clusterName);
    }

    private void notifyCaches(NotificationContext notificationContext, HelixConstants.ChangeType changeType) {
        if (notificationContext == null || notificationContext.getType() != NotificationContext.Type.CALLBACK) {
            requestDataProvidersFullRefresh();
        } else {
            updateDataChangeInProvider(changeType, notificationContext.getPathChanged());
        }
    }

    private void updateDataChangeInProvider(HelixConstants.ChangeType changeType, String str) {
        if (this._resourceControlDataProvider != null) {
            this._resourceControlDataProvider.notifyDataChange(changeType, str);
        }
        if (this._workflowControlDataProvider != null) {
            this._workflowControlDataProvider.notifyDataChange(changeType, str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void requestDataProvidersFullRefresh() {
        if (this._resourceControlDataProvider != null) {
            this._resourceControlDataProvider.requireFullRefresh();
        }
        if (this._workflowControlDataProvider != null) {
            this._workflowControlDataProvider.requireFullRefresh();
        }
    }

    private void pushToEventQueues(ClusterEventType clusterEventType, NotificationContext notificationContext, Map<String, Object> map) {
        String substring = UUID.randomUUID().toString().substring(0, 8);
        ClusterEvent clusterEvent = new ClusterEvent(this._clusterName, clusterEventType, String.format("%s_%s", substring, Pipeline.Type.DEFAULT.name()));
        clusterEvent.addAttribute(AttributeName.helixmanager.name(), notificationContext.getManager());
        clusterEvent.addAttribute(AttributeName.changeContext.name(), notificationContext);
        clusterEvent.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), this._asyncFIFOWorkerPool);
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            clusterEvent.addAttribute(entry.getKey(), entry.getValue());
        }
        enqueueEvent(this._eventQueue, clusterEvent);
        enqueueEvent(this._taskEventQueue, clusterEvent.clone(String.format("%s_%s", substring, Pipeline.Type.TASK.name())));
    }

    private void enqueueEvent(ClusterEventBlockingQueue clusterEventBlockingQueue, ClusterEvent clusterEvent) {
        if (clusterEvent == null || clusterEventBlockingQueue == null) {
            return;
        }
        clusterEventBlockingQueue.put(clusterEvent);
    }

    @Override // org.apache.helix.api.listeners.ControllerChangeListener
    public void onControllerChange(NotificationContext notificationContext) {
        boolean z;
        logger.info("START: GenericClusterController.onControllerChange() for cluster " + this._clusterName);
        requestDataProvidersFullRefresh();
        if (notificationContext == null || notificationContext.getType() == NotificationContext.Type.FINALIZE) {
            logger.info("GenericClusterController.onControllerChange() Cluster change type {} for cluster {}. Disable leadership.", notificationContext == null ? null : notificationContext.getType(), this._clusterName);
            z = false;
        } else {
            z = notificationContext.getManager().isLeader();
        }
        if (z) {
            HelixDataAccessor helixDataAccessor = notificationContext.getManager().getHelixDataAccessor();
            PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
            PauseSignal pauseSignal = (PauseSignal) helixDataAccessor.getProperty(keyBuilder.pause());
            MaintenanceSignal maintenanceSignal = (MaintenanceSignal) helixDataAccessor.getProperty(keyBuilder.maintenance());
            this._paused = updateControllerState(notificationContext, pauseSignal, this._paused);
            this._inMaintenanceMode = updateControllerState(notificationContext, maintenanceSignal, this._inMaintenanceMode);
            enableClusterStatusMonitor(true);
            this._clusterStatusMonitor.setEnabled(!this._paused);
            this._clusterStatusMonitor.setPaused(this._paused);
            this._clusterStatusMonitor.setMaintenance(this._inMaintenanceMode);
        } else {
            enableClusterStatusMonitor(false);
        }
        logger.info("END: GenericClusterController.onControllerChange() for cluster " + this._clusterName);
    }

    protected void checkLiveInstancesObservation(List<LiveInstance> list, NotificationContext notificationContext) {
        Map<String, LiveInstance> hashMap = new HashMap<>();
        Map<String, LiveInstance> hashMap2 = new HashMap<>();
        for (LiveInstance liveInstance : list) {
            hashMap.put(liveInstance.getInstanceName(), liveInstance);
            hashMap2.put(liveInstance.getEphemeralOwner(), liveInstance);
        }
        synchronized (this._lastSeenInstances) {
            Map<String, LiveInstance> map = this._lastSeenInstances.get();
            Map<String, LiveInstance> map2 = this._lastSeenSessions.get();
            HelixManager manager = notificationContext.getManager();
            PropertyKey.Builder builder = new PropertyKey.Builder(manager.getClusterName());
            if (map2 != null) {
                for (String str : map2.keySet()) {
                    if (!hashMap2.containsKey(str)) {
                        manager.removeListener(builder.currentStates(map2.get(str).getInstanceName(), str), this);
                    }
                }
            }
            if (map != null) {
                for (String str2 : map.keySet()) {
                    if (!hashMap.containsKey(str2)) {
                        manager.removeListener(builder.messages(str2), this);
                    }
                }
            }
            for (String str3 : hashMap2.keySet()) {
                if (map2 == null || !map2.containsKey(str3)) {
                    String instanceName = hashMap2.get(str3).getInstanceName();
                    try {
                        manager.addCurrentStateChangeListener(this, instanceName, str3);
                        logger.info(manager.getInstanceName() + " added current-state listener for instance: " + instanceName + ", session: " + str3 + ", listener: " + this);
                    } catch (Exception e) {
                        logger.error("Fail to add current state listener for instance: " + instanceName + " with session: " + str3, (Throwable) e);
                    }
                }
            }
            for (String str4 : hashMap.keySet()) {
                if (map == null || !map.containsKey(str4)) {
                    try {
                        manager.addMessageListener(this, str4);
                        logger.info(manager.getInstanceName() + " added message listener for " + str4 + ", listener: " + this);
                    } catch (Exception e2) {
                        logger.error("Fail to add message listener for instance: " + str4, (Throwable) e2);
                    }
                }
            }
            this._lastSeenInstances.set(hashMap);
            this._lastSeenSessions.set(hashMap2);
        }
    }

    public void shutdown() throws InterruptedException {
        stopPeriodRebalance();
        logger.info("Shutting down {} pipeline", Pipeline.Type.DEFAULT.name());
        shutdownPipeline(this._eventThread, this._eventQueue);
        logger.info("Shutting down {} pipeline", Pipeline.Type.TASK.name());
        shutdownPipeline(this._taskEventThread, this._taskEventQueue);
        this._asyncTasksThreadPool.shutdownNow();
        try {
            this._asyncTasksThreadPool.awaitTermination(1000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            logger.warn("Timeout when terminating async tasks. Some async tasks are still executing.");
        }
        shutdownAsyncFIFOWorkers();
        enableClusterStatusMonitor(false);
    }

    private void enableClusterStatusMonitor(boolean z) {
        synchronized (this._clusterStatusMonitor) {
            if (this._isMonitoring != z) {
                if (z) {
                    logger.info("Enable clusterStatusMonitor for cluster " + this._clusterName);
                    if (this._resourceControlDataProvider != null) {
                        this._resourceControlDataProvider.clearMonitoringRecords();
                    }
                    this._clusterStatusMonitor.active();
                } else {
                    logger.info("Disable clusterStatusMonitor for cluster " + this._clusterName);
                }
                this._isMonitoring = z;
            }
            resetClusterStatusMonitor();
        }
    }

    private void resetClusterStatusMonitor() {
        synchronized (this._clusterStatusMonitor) {
            if (!this._isMonitoring) {
                this._clusterStatusMonitor.reset();
            }
        }
    }

    private void shutdownPipeline(Thread thread, ClusterEventBlockingQueue clusterEventBlockingQueue) throws InterruptedException {
        if (clusterEventBlockingQueue != null) {
            clusterEventBlockingQueue.clear();
        }
        if (thread != null) {
            while (thread.isAlive()) {
                thread.interrupt();
                thread.join(1000L);
            }
        }
    }

    private boolean updateControllerState(NotificationContext notificationContext, PauseSignal pauseSignal, boolean z) {
        if (pauseSignal != null) {
            if (!z) {
                z = true;
                Logger logger2 = logger;
                Object[] objArr = new Object[1];
                objArr[0] = pauseSignal instanceof MaintenanceSignal ? "in maintenance mode" : "paused";
                logger2.info(String.format("controller is now %s", objArr));
            }
        } else if (z) {
            z = false;
            logger.info("controller is now resumed from paused state");
            String substring = UUID.randomUUID().toString().substring(0, 8);
            ClusterEvent clusterEvent = new ClusterEvent(this._clusterName, ClusterEventType.Resume, String.format("%s_%s", substring, Pipeline.Type.DEFAULT.name()));
            clusterEvent.addAttribute(AttributeName.changeContext.name(), notificationContext);
            clusterEvent.addAttribute(AttributeName.helixmanager.name(), notificationContext.getManager());
            clusterEvent.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), this._asyncFIFOWorkerPool);
            enqueueEvent(this._eventQueue, clusterEvent);
            enqueueEvent(this._taskEventQueue, clusterEvent.clone(String.format("%s_%s", substring, Pipeline.Type.TASK.name())));
        }
        return z;
    }

    private void initPipeline(Thread thread, BaseControllerDataProvider baseControllerDataProvider) {
        if (thread == null || baseControllerDataProvider == null) {
            logger.warn("pipeline cannot be initialized");
            return;
        }
        baseControllerDataProvider.setAsyncTasksThreadPool(this._asyncTasksThreadPool);
        thread.setDaemon(true);
        thread.start();
    }
}
