package org.apache.streampipes.service.core;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.connect.management.health.AdapterHealthCheck;
import org.apache.streampipes.manager.health.CoreInitialInstallationProgress;
import org.apache.streampipes.manager.health.CoreServiceStatusManager;
import org.apache.streampipes.manager.health.PipelineHealthCheck;
import org.apache.streampipes.manager.health.ServiceHealthCheck;
import org.apache.streampipes.manager.monitoring.pipeline.ExtensionsServiceLogExecutor;
import org.apache.streampipes.manager.operations.Operations;
import org.apache.streampipes.manager.setup.AutoInstallation;
import org.apache.streampipes.manager.setup.StreamPipesEnvChecker;
import org.apache.streampipes.messaging.SpProtocolDefinitionFactory;
import org.apache.streampipes.messaging.SpProtocolManager;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
import org.apache.streampipes.messaging.nats.SpNatsProtocolFactory;
import org.apache.streampipes.messaging.pulsar.SpPulsarProtocolFactory;
import org.apache.streampipes.model.configuration.SpCoreConfigurationStatus;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.rest.security.SpPermissionEvaluator;
import org.apache.streampipes.service.base.BaseNetworkingConfig;
import org.apache.streampipes.service.base.StreamPipesServiceBase;
import org.apache.streampipes.service.core.migrations.MigrationsHandler;
import org.apache.streampipes.storage.api.IPipelineStorage;
import org.apache.streampipes.storage.api.ISpCoreConfigurationStorage;
import org.apache.streampipes.storage.couchdb.impl.UserStorage;
import org.apache.streampipes.storage.couchdb.utils.CouchDbViewGenerator;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@EnableAutoConfiguration
@Import({StreamPipesResourceConfig.class, WelcomePageController.class, StreamPipesPasswordEncoder.class, WebSecurityConfig.class, SpPermissionEvaluator.class, StreamPipesPrometheusConfig.class})
@ComponentScan({"org.apache.streampipes.rest.*"})
/* loaded from: input_file:org/apache/streampipes/service/core/StreamPipesCoreApplication.class */
public class StreamPipesCoreApplication extends StreamPipesServiceBase {
    private static final Logger LOG = LoggerFactory.getLogger(StreamPipesCoreApplication.class.getCanonicalName());
    private final ISpCoreConfigurationStorage coreConfigStorage = StorageDispatcher.INSTANCE.getNoSqlStore().getSpCoreConfigurationStorage();
    private final CoreServiceStatusManager coreStatusManager = new CoreServiceStatusManager(this.coreConfigStorage);

    public static void main(String[] strArr) {
        new StreamPipesCoreApplication().initialize(() -> {
            return List.of(new SpNatsProtocolFactory(), new SpKafkaProtocolFactory(), new SpMqttProtocolFactory(), new SpJmsProtocolFactory(), new SpPulsarProtocolFactory());
        });
    }

    public void initialize(SupportedProtocols supportedProtocols) {
        try {
            registerProtocols(supportedProtocols);
            startStreamPipesService(StreamPipesCoreApplication.class, BaseNetworkingConfig.defaultResolution(8030));
        } catch (UnknownHostException e) {
            LOG.error("Could not auto-resolve host address - please manually provide the hostname using the `SP_HOST` environment variable");
        }
    }

    protected void registerProtocols(SupportedProtocols supportedProtocols) {
        List<SpProtocolDefinitionFactory<?>> supportedProtocols2 = supportedProtocols.getSupportedProtocols();
        SpProtocolManager spProtocolManager = SpProtocolManager.INSTANCE;
        Objects.requireNonNull(spProtocolManager);
        supportedProtocols2.forEach(spProtocolManager::register);
    }

    @PostConstruct
    public void init() {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledExecutorService newSingleThreadScheduledExecutor2 = Executors.newSingleThreadScheduledExecutor();
        new StreamPipesEnvChecker().updateEnvironmentVariables();
        new CouchDbViewGenerator().createGenericDatabaseIfNotExists();
        Environment environment = Environments.getEnvironment();
        if (isConfigured()) {
            if (this.coreConfigStorage.exists()) {
                this.coreStatusManager.updateCoreStatus(SpCoreConfigurationStatus.MIGRATING);
            }
            new MigrationsHandler().performMigrations();
        } else {
            CoreInitialInstallationProgress.INSTANCE.triggerInitiallyInstallingMode();
            doInitialSetup(((Integer) environment.getInitialWaitTimeBeforeInstallationInMillis().getValueOrDefault()).intValue());
        }
        this.coreStatusManager.updateCoreStatus(SpCoreConfigurationStatus.READY);
        newSingleThreadScheduledExecutor.schedule(new PostStartupTask(getPipelineStorage()), ((Integer) environment.getInitialHealthCheckDelayInMillis().getValueOrDefault()).intValue(), TimeUnit.MILLISECONDS);
        scheduleHealthChecks(((Integer) environment.getHealthCheckIntervalInMillis().getValueOrDefault()).intValue(), List.of(new ServiceHealthCheck(), new PipelineHealthCheck(), new AdapterHealthCheck()));
        LOG.info("Extensions logs will be fetched every {} milliseconds", (Integer) environment.getLogFetchIntervalInMillis().getValueOrDefault());
        newSingleThreadScheduledExecutor2.scheduleAtFixedRate(new ExtensionsServiceLogExecutor(), r0.intValue(), r0.intValue(), TimeUnit.MILLISECONDS);
    }

    private void scheduleHealthChecks(int i, List<Runnable> list) {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        list.forEach(runnable -> {
            LOG.info("Health check {} configured to run every {} {}", new Object[]{runnable.getClass().getCanonicalName(), Integer.valueOf(i), TimeUnit.MILLISECONDS});
            newSingleThreadScheduledExecutor.scheduleAtFixedRate(runnable, i, i, TimeUnit.MILLISECONDS);
        });
    }

    private boolean isConfigured() {
        return new UserStorage().existsDatabase();
    }

    private void doInitialSetup(int i) {
        LOG.info("\n\n**********\n\nWelcome to Apache StreamPipes!\n\n**********\n\n");
        LOG.info("We will perform the initial setup, grab some coffee and cross your fingers ;-)...");
        LOG.info("Auto-setup will start in {} milliseconds to make sure all services are running...", Integer.valueOf(i));
        try {
            TimeUnit.MILLISECONDS.sleep(i);
            LOG.info("Starting installation procedure");
            new AutoInstallation().startAutoInstallation();
        } catch (InterruptedException e) {
            LOG.error("Ooops, something went wrong during the installation", e);
        }
    }

    @PreDestroy
    public void onExit() {
        LOG.info("Shutting down StreamPipes...");
        LOG.info("Flagging currently running pipelines for restart...");
        List<Pipeline> list = getAllPipelines().stream().filter((v0) -> {
            return v0.isRunning();
        }).toList();
        LOG.info("Found {} running pipelines which will be stopped...", Integer.valueOf(list.size()));
        list.forEach(pipeline -> {
            pipeline.setRestartOnSystemReboot(true);
            StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
        });
        LOG.info("Gracefully stopping all running pipelines...");
        Operations.stopAllPipelines(true).forEach(pipelineOperationStatus -> {
            if (pipelineOperationStatus.isSuccess()) {
                LOG.info("Pipeline {} successfully stopped", pipelineOperationStatus.getPipelineName());
            } else {
                LOG.error("Pipeline {} could not be stopped", pipelineOperationStatus.getPipelineName());
            }
        });
        LOG.info("Thanks for using Apache StreamPipes - see you next time!");
    }

    private List<Pipeline> getAllPipelines() {
        return getPipelineStorage().getAllPipelines();
    }

    private IPipelineStorage getPipelineStorage() {
        return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
    }

    protected String getHealthCheckPath() {
        return "/streampipes-backend/api/svchealth/" + AUTO_GENERATED_SERVICE_ID;
    }
}
