package org.apache.pinot.broker.broker.helix;

import com.google.common.collect.ImmutableList;
import com.yammer.metrics.core.MetricsRegistry;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.PreConnectCallback;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.Message;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.broker.BrokerServerBuilder;
import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/broker/broker/helix/HelixBrokerStarter.class */
public class HelixBrokerStarter {
    private static final String PROPERTY_STORE = "PROPERTYSTORE";
    private final HelixManager _spectatorHelixManager;
    private final HelixManager _helixManager;
    private final HelixAdmin _helixAdmin;
    private final Configuration _pinotHelixProperties;
    private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
    private final BrokerServerBuilder _brokerServerBuilder;
    private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
    private final LiveInstancesChangeListenerImpl _liveInstancesListener;
    private final MetricsRegistry _metricsRegistry;
    private final TableQueryQuotaManager _tableQueryQuotaManager;
    private final TimeboundaryRefreshMessageHandlerFactory _tbiMessageHandler;
    private AccessControlFactory _accessControlFactory;
    private static final Logger LOGGER = LoggerFactory.getLogger(HelixBrokerStarter.class);
    private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY = "pinot.broker.routing.table";

    public HelixBrokerStarter(String str, String str2, Configuration configuration) throws Exception {
        this(null, str, str2, configuration);
    }

    public HelixBrokerStarter(String str, String str2, String str3, Configuration configuration) throws Exception {
        LOGGER.info("Starting Pinot broker");
        this._liveInstancesListener = new LiveInstancesChangeListenerImpl(str2);
        this._pinotHelixProperties = DefaultHelixBrokerConfig.getDefaultBrokerConf(configuration);
        String string = this._pinotHelixProperties.getString("instanceId", "Broker_" + (str == null ? NetUtil.getHostAddress() : str) + "_" + this._pinotHelixProperties.getInt("pinot.broker.client.queryPort", 8099));
        this._pinotHelixProperties.addProperty("pinot.broker.id", string);
        setupHelixSystemProperties();
        String replaceAll = str3.replaceAll("\\s+", "");
        LOGGER.info("Connecting Helix components");
        this._spectatorHelixManager = HelixManagerFactory.getZKHelixManager(str2, string, InstanceType.SPECTATOR, replaceAll);
        this._spectatorHelixManager.connect();
        this._helixAdmin = this._spectatorHelixManager.getClusterManagmentTool();
        this._propertyStore = this._spectatorHelixManager.getHelixPropertyStore();
        this._helixExternalViewBasedRouting = new HelixExternalViewBasedRouting(this._propertyStore, this._spectatorHelixManager, configuration.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
        this._tableQueryQuotaManager = new TableQueryQuotaManager(this._spectatorHelixManager);
        this._brokerServerBuilder = startBroker(this._pinotHelixProperties);
        this._metricsRegistry = this._brokerServerBuilder.getMetricsRegistry();
        ClusterChangeMediator clusterChangeMediator = new ClusterChangeMediator(this._helixExternalViewBasedRouting, this._tableQueryQuotaManager, this._brokerServerBuilder.getBrokerMetrics());
        this._spectatorHelixManager.addExternalViewChangeListener(clusterChangeMediator);
        this._spectatorHelixManager.addInstanceConfigChangeListener(clusterChangeMediator);
        this._spectatorHelixManager.addLiveInstanceChangeListener(this._liveInstancesListener);
        this._helixManager = HelixManagerFactory.getZKHelixManager(str2, string, InstanceType.PARTICIPANT, replaceAll);
        this._helixManager.getStateMachineEngine().registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(), new BrokerResourceOnlineOfflineStateModelFactory(this._spectatorHelixManager, this._propertyStore, this._helixExternalViewBasedRouting, this._tableQueryQuotaManager));
        this._helixManager.connect();
        this._tbiMessageHandler = new TimeboundaryRefreshMessageHandlerFactory(this._helixExternalViewBasedRouting, this._pinotHelixProperties.getLong("pinot.broker.refresh.timeBoundaryInfo.sleepInterval", BrokerServerBuilder.DEFAULT_DELAY_SHUTDOWN_TIME_MS));
        this._helixManager.getMessagingService().registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), this._tbiMessageHandler);
        addInstanceTagIfNeeded(str2, string);
        ServiceStatus.setServiceStatusCallback(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList.of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(this._helixManager, str2, string), new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(this._helixManager, str2, string))));
        this._brokerServerBuilder.getBrokerMetrics().addCallbackGauge("helix.connected", new Callable<Long>() { // from class: org.apache.pinot.broker.broker.helix.HelixBrokerStarter.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Long call() throws Exception {
                return Long.valueOf(HelixBrokerStarter.this._helixManager.isConnected() ? 1L : 0L);
            }
        });
        this._helixManager.addPreConnectCallback(new PreConnectCallback() { // from class: org.apache.pinot.broker.broker.helix.HelixBrokerStarter.2
            public void onPreConnect() {
                HelixBrokerStarter.this._brokerServerBuilder.getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L);
            }
        });
    }

    private void setupHelixSystemProperties() {
        System.setProperty("helixmanager.flappingTimeWindow", this._pinotHelixProperties.getString(DefaultHelixBrokerConfig.HELIX_FLAPPING_TIME_WINDOW_NAME, DefaultHelixBrokerConfig.DEFAULT_HELIX_FLAPPING_TIMEIWINDWOW_MS));
    }

    private void addInstanceTagIfNeeded(String str, String str2) {
        List tags = this._helixAdmin.getInstanceConfig(str, str2).getTags();
        if (tags == null || tags.isEmpty()) {
            if (ZKMetadataProvider.getClusterTenantIsolationEnabled(this._propertyStore)) {
                this._helixAdmin.addInstanceTag(str, str2, TagNameUtils.getBrokerTagForTenant("DefaultTenant"));
            } else {
                this._helixAdmin.addInstanceTag(str, str2, "broker_untagged");
            }
        }
    }

    private BrokerServerBuilder startBroker(Configuration configuration) {
        if (configuration == null) {
            configuration = DefaultHelixBrokerConfig.getDefaultBrokerConf();
        }
        final BrokerServerBuilder brokerServerBuilder = new BrokerServerBuilder(configuration, this._helixExternalViewBasedRouting, this._helixExternalViewBasedRouting.getTimeBoundaryService(), this._liveInstancesListener, this._tableQueryQuotaManager);
        this._accessControlFactory = brokerServerBuilder.getAccessControlFactory();
        this._helixExternalViewBasedRouting.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
        this._tableQueryQuotaManager.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
        brokerServerBuilder.start();
        LOGGER.info("Pinot broker ready and listening on port {} for API requests", configuration.getProperty("pinot.broker.client.queryPort"));
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.pinot.broker.broker.helix.HelixBrokerStarter.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    brokerServerBuilder.stop();
                } catch (Exception e) {
                    HelixBrokerStarter.LOGGER.error("Caught exception while running shutdown hook", e);
                }
            }
        });
        return brokerServerBuilder;
    }

    public AccessControlFactory getAccessControlFactory() {
        return this._accessControlFactory;
    }

    public static String getZkAddressForBroker(String str, String str2) {
        ArrayList arrayList = new ArrayList();
        String[] split = str.split("/", 2);
        String str3 = split[0];
        String join = StringUtil.join("/", new String[]{str2, PROPERTY_STORE});
        if (split.length > 1) {
            join = split[1] + "/" + join;
        }
        for (String str4 : str3.split(",")) {
            arrayList.add(StringUtil.join("/", new String[]{StringUtils.chomp(str4, "/"), join}));
        }
        return StringUtils.join(arrayList, ",");
    }

    public HelixManager getSpectatorHelixManager() {
        return this._spectatorHelixManager;
    }

    public HelixExternalViewBasedRouting getHelixExternalViewBasedRouting() {
        return this._helixExternalViewBasedRouting;
    }

    public BrokerServerBuilder getBrokerServerBuilder() {
        return this._brokerServerBuilder;
    }

    public static HelixBrokerStarter startDefault() throws Exception {
        PropertiesConfiguration propertiesConfiguration = new PropertiesConfiguration();
        propertiesConfiguration.addProperty("pinot.broker.client.queryPort", 5001);
        propertiesConfiguration.addProperty("pinot.broker.timeoutMs", 500000L);
        return new HelixBrokerStarter(null, "quickstart", "localhost:2122", propertiesConfiguration);
    }

    public void shutdown() {
        LOGGER.info("Shutting down");
        if (this._helixManager != null) {
            LOGGER.info("Disconnecting Helix manager");
            this._helixManager.disconnect();
        }
        if (this._spectatorHelixManager != null) {
            LOGGER.info("Disconnecting spectator Helix manager");
            this._spectatorHelixManager.disconnect();
        }
        if (this._tbiMessageHandler != null) {
            LOGGER.info("Shutting down timeboundary info refresh message handler");
            this._tbiMessageHandler.shutdown();
        }
    }

    public MetricsRegistry getMetricsRegistry() {
        return this._metricsRegistry;
    }

    public static void main(String[] strArr) throws Exception {
        startDefault();
    }
}
