package org.apache.pinot.server.starter.helix;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState;
import org.apache.pinot.core.segment.memory.PinotDataBuffer;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.server.api.access.AccessControlFactory;
import org.apache.pinot.server.conf.ServerConf;
import org.apache.pinot.server.realtime.ControllerLeaderLocator;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.server.starter.ServerInstance;
import org.apache.pinot.server.starter.ServerQueriesDisabledTracker;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.services.ServiceStartable;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/server/starter/helix/HelixServerStarter.class */
public class HelixServerStarter implements ServiceStartable {
    private static final Logger LOGGER = LoggerFactory.getLogger(HelixServerStarter.class);
    private final String _helixClusterName;
    private final String _zkAddress;
    private final PinotConfiguration _serverConf;
    private final List<ListenerConfig> _listenerConfigs;
    private final String _host;
    private final int _port;
    private final String _instanceId;
    private final HelixConfigScope _instanceConfigScope;
    private HelixManager _helixManager;
    private HelixAdmin _helixAdmin;
    private ServerInstance _serverInstance;
    private AdminApiApplication _adminApiApplication;
    private ServerQueriesDisabledTracker _serverQueriesDisabledTracker;
    private RealtimeLuceneIndexRefreshState _realtimeLuceneIndexRefreshState;

    public HelixServerStarter(String str, String str2, PinotConfiguration pinotConfiguration) throws Exception {
        this._helixClusterName = str;
        this._zkAddress = str2;
        this._serverConf = pinotConfiguration.clone();
        this._listenerConfigs = ListenerConfigUtil.buildServerAdminConfigs(this._serverConf);
        this._host = this._serverConf.getProperty("pinot.server.netty.host", this._serverConf.getProperty("pinot.set.instance.id.to.hostname", false) ? NetUtil.getHostnameOrAddress() : NetUtil.getHostAddress());
        this._port = this._serverConf.getProperty("pinot.server.netty.port", 8098);
        String property = this._serverConf.getProperty("pinot.server.instance.id");
        if (property == null) {
            property = "Server_" + this._host + "_" + this._port;
            this._serverConf.addProperty("pinot.server.instance.id", property);
        }
        this._instanceId = property;
        this._instanceConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT, new String[]{this._helixClusterName}).forParticipant(this._instanceId).build();
    }

    private void registerServiceStatusHandler() {
        double property = this._serverConf.getProperty("pinot.server.startup.minResourcePercent", 100.0d);
        int property2 = this._serverConf.getProperty("pinot.server.starter.realtimeConsumptionCatchupWaitMs", 0);
        ArrayList arrayList = new ArrayList();
        boolean z = false;
        boolean z2 = property2 > 0;
        for (String str : this._helixAdmin.getResourcesInCluster(this._helixClusterName)) {
            if (TableNameBuilder.isTableResource(str)) {
                IdealState resourceIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, str);
                if (resourceIdealState.isEnabled()) {
                    Iterator it = resourceIdealState.getPartitionSet().iterator();
                    while (true) {
                        if (it.hasNext()) {
                            if (resourceIdealState.getInstanceSet((String) it.next()).contains(this._instanceId)) {
                                arrayList.add(str);
                                break;
                            }
                        } else {
                            break;
                        }
                    }
                    if (z2 && !z && TableNameBuilder.isRealtimeTableResource(str)) {
                        Iterator it2 = resourceIdealState.getPartitionSet().iterator();
                        while (true) {
                            if (it2.hasNext()) {
                                if ("CONSUMING".equals(resourceIdealState.getInstanceStateMap((String) it2.next()).get(this._instanceId))) {
                                    z = true;
                                    break;
                                }
                            } else {
                                break;
                            }
                        }
                    }
                }
            }
        }
        ImmutableList.Builder builder = new ImmutableList.Builder();
        builder.add(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(this._helixManager, this._helixClusterName, this._instanceId, arrayList, property));
        builder.add(new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(this._helixManager, this._helixClusterName, this._instanceId, arrayList, property));
        if (z2 && z) {
            builder.add(new ServiceStatus.RealtimeConsumptionCatchupServiceStatusCallback(this._helixManager, this._helixClusterName, this._instanceId, property2));
        }
        LOGGER.info("Registering service status handler");
        ServiceStatus.setServiceStatusCallback(this._instanceId, new ServiceStatus.MultipleCallbackServiceStatusCallback(builder.build()));
    }

    private void updateInstanceConfigIfNeeded(String str, int i) {
        InstanceConfig instanceConfig = this._helixAdmin.getInstanceConfig(this._helixClusterName, this._instanceId);
        boolean z = false;
        List tags = instanceConfig.getTags();
        if (tags == null || tags.size() == 0) {
            if (ZKMetadataProvider.getClusterTenantIsolationEnabled(this._helixManager.getHelixPropertyStore())) {
                instanceConfig.addTag(TagNameUtils.getOfflineTagForTenant((String) null));
                instanceConfig.addTag(TagNameUtils.getRealtimeTagForTenant((String) null));
            } else {
                instanceConfig.addTag("server_untagged");
            }
            z = true;
        }
        if (!str.equals(instanceConfig.getHostName())) {
            instanceConfig.setHostName(str);
            z = true;
        }
        String num = Integer.toString(i);
        if (!num.equals(instanceConfig.getPort())) {
            instanceConfig.setPort(num);
            z = true;
        }
        if (!z) {
            LOGGER.info("Instance config for instance: {} has instance tags: {}, host: {}, port: {}, no need to update", new Object[]{this._instanceId, tags, str, Integer.valueOf(i)});
            return;
        }
        LOGGER.info("Updating instance config for instance: {} with instance tags: {}, host: {}, port: {}", new Object[]{this._instanceId, tags, str, Integer.valueOf(i)});
        HelixDataAccessor helixDataAccessor = this._helixManager.getHelixDataAccessor();
        Preconditions.checkState(helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().instanceConfig(this._instanceId), instanceConfig), "Failed to update instance config");
    }

    private void setupHelixSystemProperties() {
        System.setProperty("helixmanager.flappingTimeWindow", this._serverConf.getProperty("pinot.server.flapping.timeWindowMs", "1"));
    }

    private void startupServiceStatusCheck(long j) {
        LOGGER.info("Starting startup service status check");
        long currentTimeMillis = System.currentTimeMillis();
        long property = this._serverConf.getProperty("pinot.server.startup.serviceStatusCheckIntervalMs", 10000L);
        while (System.currentTimeMillis() < j) {
            ServiceStatus.Status serviceStatus = ServiceStatus.getServiceStatus();
            long currentTimeMillis2 = System.currentTimeMillis();
            if (serviceStatus == ServiceStatus.Status.GOOD) {
                LOGGER.info("Service status is GOOD after {}ms", Long.valueOf(currentTimeMillis2 - currentTimeMillis));
                return;
            }
            if (serviceStatus == ServiceStatus.Status.BAD) {
                throw new IllegalStateException("Service status is BAD");
            }
            long min = Math.min(property, j - currentTimeMillis2);
            if (min > 0) {
                LOGGER.info("Sleep for {}ms as service status has not turned GOOD: {}", Long.valueOf(min), ServiceStatus.getStatusDescription());
                try {
                    Thread.sleep(min);
                } catch (InterruptedException e) {
                    LOGGER.warn("Got interrupted while checking service status", e);
                    Thread.currentThread().interrupt();
                }
            }
        }
        LOGGER.warn("Service status has not turned GOOD within {}ms: {}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), ServiceStatus.getStatusDescription());
    }

    public ServiceRole getServiceRole() {
        return ServiceRole.SERVER;
    }

    public void start() throws Exception {
        LOGGER.info("Starting Pinot server");
        long currentTimeMillis = System.currentTimeMillis();
        LOGGER.info("Initializing Helix manager with zkAddress: {}, clusterName: {}, instanceId: {}", new Object[]{this._zkAddress, this._helixClusterName, this._instanceId});
        setupHelixSystemProperties();
        this._helixManager = HelixManagerFactory.getZKHelixManager(this._helixClusterName, this._instanceId, InstanceType.PARTICIPANT, this._zkAddress);
        LOGGER.info("Initializing server instance and registering state model factory");
        Utils.logVersions();
        ControllerLeaderLocator.create(this._helixManager);
        ServerSegmentCompletionProtocolHandler.init(this._serverConf.subset("pinot.server.segment.uploader"));
        ServerConf defaultHelixServerConfig = DefaultHelixStarterServerConfig.getDefaultHelixServerConfig(this._serverConf);
        this._serverInstance = new ServerInstance(defaultHelixServerConfig, this._helixManager);
        ServerMetrics serverMetrics = this._serverInstance.getServerMetrics();
        InstanceDataManager instanceDataManager = this._serverInstance.getInstanceDataManager();
        SegmentFetcherAndLoader segmentFetcherAndLoader = new SegmentFetcherAndLoader(this._serverConf, instanceDataManager, serverMetrics);
        this._helixManager.getStateMachineEngine().registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), new SegmentOnlineOfflineStateModelFactory(this._instanceId, instanceDataManager, segmentFetcherAndLoader));
        HelixManager helixManager = this._helixManager;
        ServerInstance serverInstance = this._serverInstance;
        serverInstance.getClass();
        helixManager.addPreConnectCallback(serverInstance::start);
        LOGGER.info("Connecting Helix manager");
        this._helixManager.connect();
        this._helixAdmin = this._helixManager.getClusterManagmentTool();
        updateInstanceConfigIfNeeded(this._host, this._port);
        String property = this._serverConf.getProperty("pinot.server.admin.access.control.factory.class", "org.apache.pinot.server.api.access.AllowAllAccessFactory");
        LOGGER.info("Using class: {} as the AccessControlFactory", property);
        try {
            AccessControlFactory accessControlFactory = (AccessControlFactory) PluginManager.get().createInstance(property);
            LOGGER.info("Starting server admin application on: {}", ListenerConfigUtil.toString(this._listenerConfigs));
            this._adminApiApplication = new AdminApiApplication(this._serverInstance, accessControlFactory);
            this._adminApiApplication.start(this._listenerConfigs);
            Optional<ListenerConfig> findFirst = this._listenerConfigs.stream().filter(listenerConfig -> {
                return "http".equals(listenerConfig.getProtocol());
            }).findFirst();
            if (findFirst.isPresent()) {
                this._helixAdmin.setConfig(this._instanceConfigScope, Collections.singletonMap("adminPort", String.valueOf(findFirst.get().getPort())));
            } else {
                this._helixAdmin.removeConfig(this._instanceConfigScope, Collections.singletonList("adminPort"));
            }
            Optional<ListenerConfig> findFirst2 = this._listenerConfigs.stream().filter(listenerConfig2 -> {
                return "https".equals(listenerConfig2.getProtocol());
            }).findFirst();
            if (findFirst2.isPresent()) {
                this._helixAdmin.setConfig(this._instanceConfigScope, Collections.singletonMap("adminHttpsPort", String.valueOf(findFirst2.get().getPort())));
            } else {
                this._helixAdmin.removeConfig(this._instanceConfigScope, Collections.singletonList("adminHttpsPort"));
            }
            if (defaultHelixServerConfig.isNettyTlsServerEnabled()) {
                this._helixAdmin.setConfig(this._instanceConfigScope, Collections.singletonMap("nettyTlsPort", String.valueOf(defaultHelixServerConfig.getNettyTlsPort())));
            } else {
                this._helixAdmin.removeConfig(this._instanceConfigScope, Collections.singletonList("nettyTlsPort"));
            }
            if (defaultHelixServerConfig.isEnableGrpcServer()) {
                this._helixAdmin.setConfig(this._instanceConfigScope, Collections.singletonMap("grpcPort", String.valueOf(defaultHelixServerConfig.getGrpcPort())));
            } else {
                this._helixAdmin.removeConfig(this._instanceConfigScope, Collections.singletonList("grpcPort"));
            }
            this._helixManager.getMessagingService().registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), new SegmentMessageHandlerFactory(segmentFetcherAndLoader, instanceDataManager, serverMetrics));
            serverMetrics.addCallbackGauge("helix.connected", () -> {
                return Long.valueOf(this._helixManager.isConnected() ? 1L : 0L);
            });
            this._helixManager.addPreConnectCallback(() -> {
                serverMetrics.addMeteredGlobalValue(ServerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L);
            });
            registerServiceStatusHandler();
            if (this._serverConf.getProperty("pinot.server.startup.enableServiceStatusCheck", true)) {
                startupServiceStatusCheck(currentTimeMillis + this._serverConf.getProperty("pinot.server.startup.timeoutMs", 600000L));
            }
            this._helixAdmin.setConfig(this._instanceConfigScope, Collections.singletonMap("shutdownInProgress", Boolean.toString(false)));
            LOGGER.info("Pinot server ready");
            serverMetrics.addCallbackGauge("memory.directBufferCount", PinotDataBuffer::getDirectBufferCount);
            serverMetrics.addCallbackGauge("memory.directBufferUsage", PinotDataBuffer::getDirectBufferUsage);
            serverMetrics.addCallbackGauge("memory.mmapBufferCount", PinotDataBuffer::getMmapBufferCount);
            serverMetrics.addCallbackGauge("memory.mmapBufferUsage", PinotDataBuffer::getMmapBufferUsage);
            serverMetrics.addCallbackGauge("memory.allocationFailureCount", PinotDataBuffer::getAllocationFailureCount);
            this._serverQueriesDisabledTracker = new ServerQueriesDisabledTracker(this._helixClusterName, this._instanceId, this._helixManager, serverMetrics);
            this._serverQueriesDisabledTracker.start();
            this._realtimeLuceneIndexRefreshState = RealtimeLuceneIndexRefreshState.getInstance();
            this._realtimeLuceneIndexRefreshState.start();
        } catch (Exception e) {
            throw new RuntimeException("Caught exception while creating new AccessControlFactory instance using class '" + property + "'", e);
        }
    }

    public void stop() {
        LOGGER.info("Shutting down Pinot server");
        long currentTimeMillis = System.currentTimeMillis();
        try {
            LOGGER.info("Closing PinotFS classes");
            PinotFSFactory.shutdown();
        } catch (IOException e) {
            LOGGER.warn("Caught exception closing PinotFS classes", e);
        }
        this._adminApiApplication.stop();
        this._helixAdmin.setConfig(this._instanceConfigScope, Collections.singletonMap("shutdownInProgress", Boolean.toString(true)));
        long property = currentTimeMillis + this._serverConf.getProperty("pinot.server.shutdown.timeoutMs", 600000L);
        if (this._serverConf.getProperty("pinot.server.shutdown.enableQueryCheck", true)) {
            shutdownQueryCheck(property);
        }
        this._helixManager.disconnect();
        this._serverInstance.shutDown();
        if (this._serverConf.getProperty("pinot.server.shutdown.enableResourceCheck", false)) {
            shutdownResourceCheck(property);
        }
        this._serverQueriesDisabledTracker.stop();
        this._realtimeLuceneIndexRefreshState.stop();
        LOGGER.info("Deregistering service status handler");
        ServiceStatus.removeServiceStatusCallback(this._instanceId);
        LOGGER.info("Finish shutting down Pinot server for {}", this._instanceId);
    }

    private void shutdownQueryCheck(long j) {
        long currentTimeMillis;
        LOGGER.info("Starting shutdown query check");
        long currentTimeMillis2 = System.currentTimeMillis();
        long property = this._serverConf.getProperty("pinot.server.query.executor.timeout", 15000L);
        long property2 = this._serverConf.getProperty("pinot.server.shutdown.noQueryThresholdMs", property);
        boolean z = false;
        while (true) {
            currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis >= j) {
                break;
            }
            long latestQueryTime = currentTimeMillis - this._serverInstance.getLatestQueryTime();
            if (latestQueryTime >= property2) {
                LOGGER.info("No query received within {}ms (larger than the threshold: {}ms), mark it as no incoming queries", Long.valueOf(latestQueryTime), Long.valueOf(property2));
                z = true;
                break;
            }
            long min = Math.min(property2 - latestQueryTime, j - currentTimeMillis);
            LOGGER.info("Sleep for {}ms as there are still incoming queries (no query time: {}ms is smaller than the threshold: {}ms)", new Object[]{Long.valueOf(min), Long.valueOf(latestQueryTime), Long.valueOf(property2)});
            try {
                Thread.sleep(min);
            } catch (InterruptedException e) {
                LOGGER.warn("Got interrupted while waiting for no incoming queries", e);
                Thread.currentThread().interrupt();
            }
        }
        if (!z) {
            LOGGER.warn("Failed to drain queries within {}ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
            return;
        }
        long latestQueryTime2 = this._serverInstance.getLatestQueryTime() + property;
        if (latestQueryTime2 > currentTimeMillis) {
            long j2 = latestQueryTime2 - currentTimeMillis;
            LOGGER.info("Sleep for {}ms to ensure all the existing queries are finished", Long.valueOf(j2));
            try {
                Thread.sleep(j2);
            } catch (InterruptedException e2) {
                LOGGER.warn("Got interrupted while waiting for all the existing queries to be finished", e2);
                Thread.currentThread().interrupt();
            }
        }
        LOGGER.info("Finished draining queries after {}ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
    }

    private void shutdownResourceCheck(long j) {
        LOGGER.info("Starting shutdown resource check");
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis >= j) {
            LOGGER.warn("Skipping shutdown resource check because shutdown timeout is already reached");
            return;
        }
        HelixAdmin helixAdmin = null;
        try {
            helixAdmin = new ZKHelixAdmin(this._zkAddress);
            HashSet hashSet = new HashSet();
            for (String str : helixAdmin.getResourcesInCluster(this._helixClusterName)) {
                if (TableNameBuilder.isTableResource(str)) {
                    IdealState resourceIdealState = helixAdmin.getResourceIdealState(this._helixClusterName, str);
                    if (resourceIdealState != null && resourceIdealState.isEnabled()) {
                        Iterator it = resourceIdealState.getPartitionSet().iterator();
                        while (true) {
                            if (it.hasNext()) {
                                if (resourceIdealState.getInstanceSet((String) it.next()).contains(this._instanceId)) {
                                    hashSet.add(str);
                                    break;
                                }
                            } else {
                                break;
                            }
                        }
                    }
                }
            }
            long property = this._serverConf.getProperty("pinot.server.shutdown.resourceCheckIntervalMs", 10000L);
            while (System.currentTimeMillis() < j) {
                Iterator it2 = hashSet.iterator();
                String str2 = null;
                while (it2.hasNext()) {
                    str2 = (String) it2.next();
                    if (!isResourceOffline(helixAdmin, str2)) {
                        break;
                    } else {
                        it2.remove();
                    }
                }
                long currentTimeMillis2 = System.currentTimeMillis();
                if (hashSet.isEmpty()) {
                    LOGGER.info("All resources are OFFLINE after {}ms", Long.valueOf(currentTimeMillis2 - currentTimeMillis));
                    if (helixAdmin != null) {
                        helixAdmin.close();
                        return;
                    }
                    return;
                }
                long min = Math.min(property, j - currentTimeMillis2);
                if (min > 0) {
                    LOGGER.info("Sleep for {}ms as some resources [{}, ...] are still ONLINE", Long.valueOf(min), str2);
                    try {
                        Thread.sleep(min);
                    } catch (InterruptedException e) {
                        LOGGER.warn("Got interrupted while waiting for all resources OFFLINE", e);
                        Thread.currentThread().interrupt();
                    }
                }
            }
            Iterator it3 = hashSet.iterator();
            while (it3.hasNext()) {
                if (isResourceOffline(helixAdmin, (String) it3.next())) {
                    it3.remove();
                }
            }
            long currentTimeMillis3 = System.currentTimeMillis();
            if (hashSet.isEmpty()) {
                LOGGER.info("All resources are OFFLINE after {}ms", Long.valueOf(currentTimeMillis3 - currentTimeMillis));
            } else {
                LOGGER.warn("There are still {} resources ONLINE within {}ms: {}", new Object[]{Integer.valueOf(hashSet.size()), Long.valueOf(currentTimeMillis3 - currentTimeMillis), hashSet});
            }
            if (helixAdmin != null) {
                helixAdmin.close();
            }
        } catch (Throwable th) {
            if (helixAdmin != null) {
                helixAdmin.close();
            }
            throw th;
        }
    }

    private boolean isResourceOffline(HelixAdmin helixAdmin, String str) {
        ExternalView resourceExternalView = helixAdmin.getResourceExternalView(this._helixClusterName, str);
        if (resourceExternalView == null) {
            return true;
        }
        Iterator it = resourceExternalView.getPartitionSet().iterator();
        while (it.hasNext()) {
            String str2 = (String) resourceExternalView.getStateMap((String) it.next()).get(this._instanceId);
            if ("ONLINE".equals(str2) || "CONSUMING".equals(str2)) {
                return false;
            }
        }
        return true;
    }

    public String getInstanceId() {
        return this._instanceId;
    }

    public PinotConfiguration getConfig() {
        return this._serverConf;
    }

    public static HelixServerStarter startDefault() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("pinot.server.netty.port", 8003);
        hashMap.put("pinot.server.instance.dataDir", "/tmp/PinotServer/test8003/index");
        hashMap.put("pinot.server.instance.segmentTarDir", "/tmp/PinotServer/test8003/segmentTar");
        HelixServerStarter helixServerStarter = new HelixServerStarter("quickstart", "localhost:2191", new PinotConfiguration(hashMap));
        helixServerStarter.start();
        return helixServerStarter;
    }

    public static void main(String[] strArr) throws Exception {
        startDefault();
    }
}
