package org.apache.streampipes.service.core;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.streampipes.connect.management.management.WorkerAdministrationManagement;
import org.apache.streampipes.manager.operations.Operations;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import org.apache.streampipes.storage.api.IPipelineStorage;
import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streampipes/service/core/PostStartupTask.class */
public class PostStartupTask implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(PostStartupTask.class);
    private static final int MAX_PIPELINE_START_RETRIES = 3;
    private static final int WAIT_TIME_AFTER_FAILURE_IN_SECONDS = 10;
    private final IPipelineStorage pipelineStorage;
    private final Map<String, Integer> failedPipelines = new HashMap();
    private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
    private final WorkerAdministrationManagement workerAdministrationManagement = new WorkerAdministrationManagement();

    public PostStartupTask(IPipelineStorage iPipelineStorage) {
        this.pipelineStorage = iPipelineStorage;
    }

    @Override // java.lang.Runnable
    public void run() {
        performAdapterAssetUpdate();
        startAllPreviouslyStoppedPipelines();
        startAdapters();
    }

    private void performAdapterAssetUpdate() {
        this.workerAdministrationManagement.performAdapterMigrations(CouchDbStorageManager.INSTANCE.getExtensionsServiceStorage().getAll().stream().flatMap(spServiceRegistration -> {
            return spServiceRegistration.getTags().stream();
        }).filter(spServiceTag -> {
            return spServiceTag.getPrefix() == SpServiceTagPrefix.ADAPTER;
        }).toList());
    }

    private void startAdapters() {
        this.workerAdministrationManagement.checkAndRestore(0);
    }

    private void startAllPreviouslyStoppedPipelines() {
        List allPipelines = this.pipelineStorage.getAllPipelines();
        LOG.info("Checking for orphaned pipelines...");
        List list = allPipelines.stream().filter((v0) -> {
            return v0.isRunning();
        }).toList();
        LOG.info("Found {} orphaned pipelines", Integer.valueOf(list.size()));
        list.forEach(pipeline -> {
            LOG.info("Restoring orphaned pipeline {}", pipeline.getName());
            startPipeline(pipeline, false);
        });
        LOG.info("Checking for gracefully shut down pipelines to be restarted...");
        List list2 = allPipelines.stream().filter(pipeline2 -> {
            return !pipeline2.isRunning();
        }).filter((v0) -> {
            return v0.isRestartOnSystemReboot();
        }).toList();
        LOG.info("Found {} pipelines that we are attempting to restart...", Integer.valueOf(list2.size()));
        list2.forEach(pipeline3 -> {
            startPipeline(pipeline3, false);
        });
        LOG.info("No more pipelines to restore...");
    }

    private void startPipeline(Pipeline pipeline, boolean z) {
        PipelineOperationStatus startPipeline = Operations.startPipeline(pipeline);
        if (startPipeline.isSuccess()) {
            LOG.info("Pipeline {} successfully restarted", startPipeline.getPipelineName());
            Pipeline pipeline2 = getPipelineStorage().getPipeline(pipeline.getPipelineId());
            pipeline2.setRestartOnSystemReboot(z);
            getPipelineStorage().updatePipeline(pipeline2);
            return;
        }
        storeFailedRestartAttempt(pipeline);
        int intValue = this.failedPipelines.get(pipeline.getPipelineId()).intValue();
        if (intValue > MAX_PIPELINE_START_RETRIES) {
            LOG.error("Pipeline {} could not be restarted - are all pipeline element containers running?", startPipeline.getPipelineName());
        } else {
            LOG.error("Pipeline {} could not be restarted - I'll try again in {} seconds ({}/{} failed attempts)", new Object[]{pipeline.getName(), Integer.valueOf(WAIT_TIME_AFTER_FAILURE_IN_SECONDS), Integer.valueOf(intValue), Integer.valueOf(MAX_PIPELINE_START_RETRIES)});
            schedulePipelineStart(pipeline, z);
        }
    }

    private void schedulePipelineStart(Pipeline pipeline, boolean z) {
        this.executorService.schedule(() -> {
            startPipeline(pipeline, z);
        }, 10L, TimeUnit.SECONDS);
    }

    private void storeFailedRestartAttempt(Pipeline pipeline) {
        String pipelineId = pipeline.getPipelineId();
        if (!this.failedPipelines.containsKey(pipelineId)) {
            this.failedPipelines.put(pipelineId, 1);
        } else {
            this.failedPipelines.put(pipelineId, Integer.valueOf(this.failedPipelines.get(pipelineId).intValue() + 1));
        }
    }

    private IPipelineStorage getPipelineStorage() {
        return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
    }
}
