package org.apache.streampipes.backend;

import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.manager.health.PipelineHealthCheck;
import org.apache.streampipes.manager.operations.Operations;
import org.apache.streampipes.manager.setup.AutoInstallation;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import org.apache.streampipes.rest.security.SpPermissionEvaluator;
import org.apache.streampipes.service.base.BaseNetworkingConfig;
import org.apache.streampipes.service.base.StreamPipesServiceBase;
import org.apache.streampipes.storage.api.IPipelineStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@EnableAutoConfiguration
@Import({StreamPipesResourceConfig.class, WelcomePageController.class, StreamPipesPasswordEncoder.class, WebSecurityConfig.class, SpPermissionEvaluator.class})
@ComponentScan({"org.apache.streampipes.rest.*"})
/* loaded from: input_file:BOOT-INF/classes/org/apache/streampipes/backend/StreamPipesBackendApplication.class */
public class StreamPipesBackendApplication extends StreamPipesServiceBase {
    private static final int MAX_PIPELINE_START_RETRIES = 3;
    private static final int WAIT_TIME_AFTER_FAILURE_IN_SECONDS = 10;
    private static final int HEALTH_CHECK_INTERVAL = 60;
    private ScheduledExecutorService executorService;
    private ScheduledExecutorService healthCheckExecutorService;
    private Map<String, Integer> failedPipelines = new HashMap();
    private static final Logger LOG = LoggerFactory.getLogger(StreamPipesBackendApplication.class.getCanonicalName());
    private static final TimeUnit HEALTH_CHECK_UNIT = TimeUnit.SECONDS;

    public static void main(String[] strArr) {
        StreamPipesBackendApplication streamPipesBackendApplication = new StreamPipesBackendApplication();
        try {
            streamPipesBackendApplication.startStreamPipesService(StreamPipesBackendApplication.class, DefaultSpServiceGroups.CORE, streamPipesBackendApplication.serviceId(), BaseNetworkingConfig.defaultResolution(8030));
        } catch (UnknownHostException e) {
            LOG.error("Could not auto-resolve host address - please manually provide the hostname using the SP_HOST environment variable");
        }
    }

    private String serviceId() {
        return "core-" + AUTO_GENERATED_SERVICE_ID;
    }

    @PostConstruct
    public void init() {
        this.executorService = Executors.newSingleThreadScheduledExecutor();
        this.healthCheckExecutorService = Executors.newSingleThreadScheduledExecutor();
        if (!isConfigured()) {
            doInitialSetup();
        }
        this.executorService.schedule(this::startAllPreviouslyStoppedPipelines, 5L, TimeUnit.SECONDS);
        LOG.info("Pipeline health check will run every {} seconds", (Object) 60);
        this.healthCheckExecutorService.scheduleAtFixedRate(new PipelineHealthCheck(), 60L, 60L, HEALTH_CHECK_UNIT);
    }

    private boolean isConfigured() {
        return BackendConfig.INSTANCE.isConfigured();
    }

    private void doInitialSetup() {
        LOG.info("\n\n**********\n\nWelcome to Apache StreamPipes!\n\n**********\n\n");
        LOG.info("We will perform the initial setup, grab some coffee and cross your fingers ;-)...");
        BackendConfig.INSTANCE.updateSetupStatus(true);
        LOG.info("Auto-setup will start in 10 seconds to make sure extensions services are running...");
        try {
            TimeUnit.SECONDS.sleep(10L);
            LOG.info("Starting installation procedure");
            new AutoInstallation().startAutoInstallation();
            BackendConfig.INSTANCE.updateSetupStatus(false);
        } catch (InterruptedException e) {
            LOG.error("Ooops, something went wrong during the installation", (Throwable) e);
        }
    }

    private void schedulePipelineStart(Pipeline pipeline, boolean z) {
        this.executorService.schedule(() -> {
            startPipeline(pipeline, z);
        }, 10L, TimeUnit.SECONDS);
    }

    @PreDestroy
    public void onExit() {
        LOG.info("Shutting down StreamPipes...");
        LOG.info("Flagging currently running pipelines for restart...");
        List list = (List) getAllPipelines().stream().filter((v0) -> {
            return v0.isRunning();
        }).collect(Collectors.toList());
        LOG.info("Found {} running pipelines which will be stopped...", Integer.valueOf(list.size()));
        list.forEach(pipeline -> {
            pipeline.setRestartOnSystemReboot(true);
            StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
        });
        LOG.info("Gracefully stopping all running pipelines...");
        Operations.stopAllPipelines(true).forEach(pipelineOperationStatus -> {
            if (pipelineOperationStatus.isSuccess()) {
                LOG.info("Pipeline {} successfully stopped", pipelineOperationStatus.getPipelineName());
            } else {
                LOG.error("Pipeline {} could not be stopped", pipelineOperationStatus.getPipelineName());
            }
        });
        deregisterService(serviceId());
        LOG.info("Thanks for using Apache StreamPipes - see you next time!");
    }

    private void startAllPreviouslyStoppedPipelines() {
        LOG.info("Checking for orphaned pipelines...");
        List list = (List) getAllPipelines().stream().filter((v0) -> {
            return v0.isRunning();
        }).collect(Collectors.toList());
        LOG.info("Found {} orphaned pipelines", Integer.valueOf(list.size()));
        list.forEach(pipeline -> {
            LOG.info("Restoring orphaned pipeline {}", pipeline.getName());
            startPipeline(pipeline, false);
        });
        LOG.info("Checking for gracefully shut down pipelines to be restarted...");
        List list2 = (List) getAllPipelines().stream().filter(pipeline2 -> {
            return !pipeline2.isRunning();
        }).filter((v0) -> {
            return v0.isRestartOnSystemReboot();
        }).collect(Collectors.toList());
        LOG.info("Found {} pipelines that we are attempting to restart...", Integer.valueOf(list2.size()));
        list2.forEach(pipeline3 -> {
            startPipeline(pipeline3, false);
        });
        LOG.info("No more pipelines to restore...");
    }

    private void startPipeline(Pipeline pipeline, boolean z) {
        PipelineOperationStatus startPipeline = Operations.startPipeline(pipeline);
        if (startPipeline.isSuccess()) {
            LOG.info("Pipeline {} successfully restarted", startPipeline.getPipelineName());
            Pipeline pipeline2 = getPipelineStorage().getPipeline(pipeline.getPipelineId());
            pipeline2.setRestartOnSystemReboot(z);
            getPipelineStorage().updatePipeline(pipeline2);
            return;
        }
        storeFailedRestartAttempt(pipeline);
        int intValue = this.failedPipelines.get(pipeline.getPipelineId()).intValue();
        if (intValue > 3) {
            LOG.error("Pipeline {} could not be restarted - are all pipeline element containers running?", startPipeline.getPipelineName());
        } else {
            LOG.error("Pipeline {} could not be restarted - I'll try again in {} seconds ({}/{} failed attempts)", pipeline.getName(), 10, Integer.valueOf(intValue), 3);
            schedulePipelineStart(pipeline, z);
        }
    }

    private void storeFailedRestartAttempt(Pipeline pipeline) {
        String pipelineId = pipeline.getPipelineId();
        if (!this.failedPipelines.containsKey(pipelineId)) {
            this.failedPipelines.put(pipelineId, 1);
        } else {
            this.failedPipelines.put(pipelineId, Integer.valueOf(this.failedPipelines.get(pipelineId).intValue() + 1));
        }
    }

    private List<Pipeline> getAllPipelines() {
        return getPipelineStorage().getAllPipelines();
    }

    private IPipelineStorage getPipelineStorage() {
        return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
    }

    @Override // org.apache.streampipes.service.base.StreamPipesServiceBase
    protected List<SpServiceTag> getServiceTags() {
        return Arrays.asList(DefaultSpServiceTags.CORE, DefaultSpServiceTags.CONNECT_MASTER, DefaultSpServiceTags.STREAMPIPES_CLIENT);
    }

    @Override // org.apache.streampipes.service.base.StreamPipesServiceBase
    protected String getHealthCheckPath() {
        return "/streampipes-backend/api/svchealth/" + AUTO_GENERATED_SERVICE_ID;
    }
}
