package org.apache.pinot.broker.broker.helix;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixConstants;
import org.apache.helix.NotificationContext;
import org.apache.helix.api.listeners.BatchMode;
import org.apache.helix.api.listeners.ExternalViewChangeListener;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.BrokerTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@BatchMode(enabled = false)
@PreFetch(enabled = false)
/* loaded from: input_file:org/apache/pinot/broker/broker/helix/ClusterChangeMediator.class */
public class ClusterChangeMediator implements ExternalViewChangeListener, InstanceConfigChangeListener, LiveInstanceChangeListener {
    private static final Logger LOGGER;
    private static final long PROACTIVE_CHANGE_CHECK_INTERVAL_MS = 3600000;
    private final Map<HelixConstants.ChangeType, List<ClusterChangeHandler>> _changeHandlersMap;
    private final Thread _clusterChangeHandlingThread;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Map<HelixConstants.ChangeType, Long> _lastChangeTimeMap = new HashMap();
    private final Map<HelixConstants.ChangeType, Long> _lastProcessTimeMap = new HashMap();
    private boolean _stopped = false;

    public ClusterChangeMediator(Map<HelixConstants.ChangeType, List<ClusterChangeHandler>> map, final BrokerMetrics brokerMetrics) {
        this._changeHandlersMap = map;
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<HelixConstants.ChangeType> it2 = map.keySet().iterator();
        while (it2.hasNext()) {
            this._lastProcessTimeMap.put(it2.next(), Long.valueOf(currentTimeMillis));
        }
        this._clusterChangeHandlingThread = new Thread("ClusterChangeHandlingThread") { // from class: org.apache.pinot.broker.broker.helix.ClusterChangeMediator.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                Long l;
                while (true) {
                    try {
                        for (Map.Entry entry : ClusterChangeMediator.this._changeHandlersMap.entrySet()) {
                            if (ClusterChangeMediator.this._stopped) {
                                return;
                            }
                            HelixConstants.ChangeType changeType = (HelixConstants.ChangeType) entry.getKey();
                            List list = (List) entry.getValue();
                            long currentTimeMillis2 = System.currentTimeMillis();
                            synchronized (ClusterChangeMediator.this._lastChangeTimeMap) {
                                l = (Long) ClusterChangeMediator.this._lastChangeTimeMap.remove(changeType);
                            }
                            if (l != null) {
                                brokerMetrics.addTimedValue(BrokerTimer.CLUSTER_CHANGE_QUEUE_TIME, currentTimeMillis2 - l.longValue(), TimeUnit.MILLISECONDS);
                                ClusterChangeMediator.this.processClusterChange(changeType, list);
                            } else if (currentTimeMillis2 - ((Long) ClusterChangeMediator.this._lastProcessTimeMap.get(changeType)).longValue() > 3600000) {
                                ClusterChangeMediator.LOGGER.info("Proactive check {} change", changeType);
                                brokerMetrics.addMeteredGlobalValue(BrokerMeter.PROACTIVE_CLUSTER_CHANGE_CHECK, 1L);
                                ClusterChangeMediator.this.processClusterChange(changeType, list);
                            }
                        }
                        synchronized (ClusterChangeMediator.this._lastChangeTimeMap) {
                            if (ClusterChangeMediator.this._stopped) {
                                return;
                            }
                            if (ClusterChangeMediator.this._lastChangeTimeMap.isEmpty()) {
                                ClusterChangeMediator.this._lastChangeTimeMap.wait(360000L);
                            }
                        }
                    } catch (Exception e) {
                        ClusterChangeMediator.LOGGER.error("Caught exception within cluster change handling thread", (Throwable) e);
                    }
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processClusterChange(HelixConstants.ChangeType changeType, List<ClusterChangeHandler> list) {
        long currentTimeMillis = System.currentTimeMillis();
        LOGGER.info("Start processing {} change", changeType);
        for (ClusterChangeHandler clusterChangeHandler : list) {
            try {
                long currentTimeMillis2 = System.currentTimeMillis();
                clusterChangeHandler.processClusterChange(changeType);
                LOGGER.info("Finish handling {} change for handler: {} in {}ms", changeType, clusterChangeHandler.getClass().getName(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
            } catch (Exception e) {
                LOGGER.error("Caught exception while handling {} change for handler: {}", changeType, clusterChangeHandler.getClass().getName(), e);
            }
        }
        long currentTimeMillis3 = System.currentTimeMillis();
        LOGGER.info("Finish processing {} change in {}ms", changeType, Long.valueOf(currentTimeMillis3 - currentTimeMillis));
        this._lastProcessTimeMap.put(changeType, Long.valueOf(currentTimeMillis3));
    }

    public synchronized void start() {
        LOGGER.info("Starting the cluster change handling thread");
        this._clusterChangeHandlingThread.start();
    }

    public synchronized void stop() {
        LOGGER.info("Stopping the cluster change handling thread");
        this._stopped = true;
        synchronized (this._lastChangeTimeMap) {
            this._lastChangeTimeMap.notify();
        }
        try {
            this._clusterChangeHandlingThread.join();
        } catch (InterruptedException e) {
            LOGGER.error("Caught InterruptedException while waiting for cluster change handling thread to die");
            Thread.currentThread().interrupt();
        }
    }

    @Override // org.apache.helix.api.listeners.ExternalViewChangeListener
    public void onExternalViewChange(List<ExternalView> list, NotificationContext notificationContext) {
        if (!$assertionsDisabled && !list.isEmpty()) {
            throw new AssertionError();
        }
        enqueueChange(HelixConstants.ChangeType.EXTERNAL_VIEW);
    }

    @Override // org.apache.helix.api.listeners.InstanceConfigChangeListener
    public void onInstanceConfigChange(List<InstanceConfig> list, NotificationContext notificationContext) {
        if (!$assertionsDisabled && !list.isEmpty()) {
            throw new AssertionError();
        }
        enqueueChange(HelixConstants.ChangeType.INSTANCE_CONFIG);
    }

    @Override // org.apache.helix.api.listeners.LiveInstanceChangeListener
    public void onLiveInstanceChange(List<LiveInstance> list, NotificationContext notificationContext) {
        if (!$assertionsDisabled && !list.isEmpty()) {
            throw new AssertionError();
        }
        enqueueChange(HelixConstants.ChangeType.LIVE_INSTANCE);
    }

    private synchronized void enqueueChange(HelixConstants.ChangeType changeType) {
        if (this._stopped) {
            return;
        }
        if (!this._clusterChangeHandlingThread.isAlive()) {
            LOGGER.error("Cluster change handling thread is not alive, directly process the {} change", changeType);
            processClusterChange(changeType, this._changeHandlersMap.get(changeType));
            return;
        }
        LOGGER.info("Enqueue {} change", changeType);
        synchronized (this._lastChangeTimeMap) {
            if (!this._lastChangeTimeMap.containsKey(changeType)) {
                this._lastChangeTimeMap.put(changeType, Long.valueOf(System.currentTimeMillis()));
                this._lastChangeTimeMap.notify();
            }
        }
    }

    static {
        $assertionsDisabled = !ClusterChangeMediator.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger((Class<?>) ClusterChangeMediator.class);
    }
}
