package org.apache.streampipes.manager.migration;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.streampipes.manager.execution.PipelineExecutor;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.migration.MigrationResult;
import org.apache.streampipes.model.migration.ModelMigratorConfig;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
import org.apache.streampipes.storage.api.IDataProcessorStorage;
import org.apache.streampipes.storage.api.IDataSinkStorage;
import org.apache.streampipes.storage.api.IPipelineStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streampipes/manager/migration/PipelineElementMigrationManager.class */
public class PipelineElementMigrationManager extends AbstractMigrationManager implements IMigrationHandler {
    private static final Logger LOG = LoggerFactory.getLogger(PipelineElementMigrationManager.class);
    private final IPipelineStorage pipelineStorage;
    private final IDataProcessorStorage dataProcessorStorage;
    private final IDataSinkStorage dataSinkStorage;

    public PipelineElementMigrationManager(IPipelineStorage iPipelineStorage, IDataProcessorStorage iDataProcessorStorage, IDataSinkStorage iDataSinkStorage) {
        this.pipelineStorage = iPipelineStorage;
        this.dataProcessorStorage = iDataProcessorStorage;
        this.dataSinkStorage = iDataSinkStorage;
    }

    @Override // org.apache.streampipes.manager.migration.IMigrationHandler
    public void handleMigrations(SpServiceRegistration spServiceRegistration, List<ModelMigratorConfig> list) {
        if (list.isEmpty()) {
            LOG.info("No pipeline element migrations to perform");
            return;
        }
        LOG.info("Updating pipeline element descriptions by replacement...");
        updateDescriptions(list, spServiceRegistration.getServiceUrl());
        LOG.info("Pipeline element descriptions are up to date.");
        LOG.info("Received {} pipeline element migrations from extension service {}.", Integer.valueOf(list.size()), spServiceRegistration.getServiceUrl());
        List<Pipeline> allPipelines = this.pipelineStorage.getAllPipelines();
        if (!allPipelines.isEmpty()) {
            LOG.info("Found {} available pipelines. Checking pipelines for applicable migrations...", Integer.valueOf(allPipelines.size()));
        }
        for (Pipeline pipeline : allPipelines) {
            if (shouldMigratePipeline(pipeline, list)) {
                ArrayList arrayList = new ArrayList();
                pipeline.setSepas(pipeline.getSepas().stream().map(dataProcessorInvocation -> {
                    if (MigrationUtils.getApplicableMigration(dataProcessorInvocation, list).isPresent()) {
                        return migratePipelineElement(dataProcessorInvocation, list, String.format("%s/%s/processor", spServiceRegistration.getServiceUrl(), "api/v1/migrations"), arrayList);
                    }
                    LOG.info("No migration applicable for data processor '{}'.", dataProcessorInvocation.getElementId());
                    return dataProcessorInvocation;
                }).toList());
                pipeline.setActions(pipeline.getActions().stream().map(dataSinkInvocation -> {
                    if (MigrationUtils.getApplicableMigration(dataSinkInvocation, list).isPresent()) {
                        return migratePipelineElement(dataSinkInvocation, list, String.format("%s/%s/sink", spServiceRegistration.getServiceUrl(), "api/v1/migrations"), arrayList);
                    }
                    LOG.info("No migration applicable for data sink '{}'.", dataSinkInvocation.getElementId());
                    return dataSinkInvocation;
                }).toList());
                this.pipelineStorage.updatePipeline(pipeline);
                if (arrayList.isEmpty()) {
                    LOG.info("Migration for pipeline successfully completed.");
                } else {
                    handleFailedMigrations(this.pipelineStorage.getPipeline(pipeline.getPipelineId()), arrayList);
                }
            }
        }
    }

    private boolean shouldMigratePipeline(Pipeline pipeline, List<ModelMigratorConfig> list) {
        return Stream.concat(pipeline.getSepas().stream(), pipeline.getActions().stream()).anyMatch(invocableStreamPipesEntity -> {
            return MigrationUtils.getApplicableMigration(invocableStreamPipesEntity, list).isPresent();
        });
    }

    protected void handleFailedMigrations(Pipeline pipeline, List<MigrationResult<?>> list) {
        LOG.error("Failures in migration detected - The following pipeline elements could to be migrated:\n" + StringUtils.join(new List[]{list.stream().map((v0) -> {
            return v0.toString();
        }).toList()}), "\n");
        pipeline.setPipelineNotifications(list.stream().map(migrationResult -> {
            return "Failed migration of pipeline element: %s".formatted(migrationResult.message());
        }).toList());
        pipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
        this.pipelineStorage.updatePipeline(pipeline);
        stopPipeline(this.pipelineStorage.getPipeline(pipeline.getPipelineId()));
    }

    public void stopPipeline(Pipeline pipeline) {
        if (new PipelineExecutor(pipeline, true).stopPipeline().isSuccess()) {
            LOG.info("Pipeline successfully stopped.");
        } else {
            LOG.error("Pipeline stop failed.");
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v23, types: [org.apache.streampipes.model.base.InvocableStreamPipesEntity] */
    protected <T extends InvocableStreamPipesEntity> T migratePipelineElement(T t, List<ModelMigratorConfig> list, String str, List<MigrationResult<?>> list2) {
        while (MigrationUtils.getApplicableMigration(t, list).isPresent() && list2.isEmpty()) {
            ModelMigratorConfig modelMigratorConfig = MigrationUtils.getApplicableMigration(t, list).get();
            LOG.info("Found applicable migration for pipeline element '{}': {}", t.getElementId(), modelMigratorConfig);
            MigrationResult<?> performMigration = performMigration(t, modelMigratorConfig, str);
            if (performMigration.success()) {
                LOG.info("Migration successfully performed by extensions service. Updating pipeline element invocation ...");
                LOG.debug("Migration was performed at extensions service endpoint '{}'", str);
                t = (InvocableStreamPipesEntity) performMigration.element();
            } else {
                LOG.error("Migration failed with the following reason: {}", performMigration.message());
                list2.add(performMigration);
            }
        }
        if (!list2.isEmpty()) {
            updateFailedPipelineElement(t);
            LOG.info("Updated pipeline elements with new description where automatic migration failed.");
        }
        return t;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v13, types: [java.util.List] */
    /* JADX WARN: Type inference failed for: r0v8, types: [java.util.List] */
    protected void updateFailedPipelineElement(InvocableStreamPipesEntity invocableStreamPipesEntity) {
        ArrayList arrayList = new ArrayList();
        if (invocableStreamPipesEntity instanceof DataProcessorInvocation) {
            arrayList = this.dataProcessorStorage.getFirstDataProcessorByAppId(invocableStreamPipesEntity.getAppId()).getStaticProperties();
        } else if (invocableStreamPipesEntity instanceof DataSinkInvocation) {
            arrayList = this.dataSinkStorage.getFirstDataSinkByAppId(invocableStreamPipesEntity.getAppId()).getStaticProperties();
        }
        invocableStreamPipesEntity.setStaticProperties(arrayList);
    }
}
