package org.apache.streampipes.manager.execution.http;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.streampipes.commons.MD5;
import org.apache.streampipes.commons.Utils;
import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants;
import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
import org.apache.streampipes.manager.util.TemporaryGraphStorage;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.message.PipelineStatusMessage;
import org.apache.streampipes.model.message.PipelineStatusMessageType;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineElementStatus;
import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import org.apache.streampipes.resource.management.secret.SecretProvider;
import org.apache.streampipes.storage.api.IPipelineStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
import org.lightcouch.DocumentConflictException;

/* loaded from: input_file:BOOT-INF/lib/streampipes-pipeline-management-0.69.0.jar:org/apache/streampipes/manager/execution/http/PipelineExecutor.class */
public class PipelineExecutor {
    private final Pipeline pipeline;
    private final boolean storeStatus;
    private final boolean forceStop;

    public PipelineExecutor(Pipeline pipeline, boolean z, boolean z2) {
        this.pipeline = pipeline;
        this.storeStatus = z;
        this.forceStop = z2;
    }

    public PipelineOperationStatus startPipeline() {
        PipelineOperationStatus pipelineOperationStatus;
        this.pipeline.getSepas().forEach((v1) -> {
            updateGroupIds(v1);
        });
        this.pipeline.getActions().forEach((v1) -> {
            updateGroupIds(v1);
        });
        List<DataProcessorInvocation> sepas = this.pipeline.getSepas();
        List<DataSinkInvocation> actions = this.pipeline.getActions();
        List<SpDataSet> list = (List) this.pipeline.getStreams().stream().filter(spDataStream -> {
            return spDataStream instanceof SpDataSet;
        }).map(spDataStream2 -> {
            return new SpDataSet((SpDataSet) spDataStream2);
        }).collect(Collectors.toList());
        ArrayList arrayList = new ArrayList();
        list.forEach(spDataSet -> {
            spDataSet.setCorrespondingPipeline(this.pipeline.getPipelineId());
            try {
                spDataSet.setSelectedEndpointUrl(findSelectedEndpoint(spDataSet));
            } catch (NoServiceEndpointsAvailableException e) {
                arrayList.add(spDataSet);
            }
        });
        ArrayList arrayList2 = new ArrayList();
        arrayList2.addAll(sepas);
        arrayList2.addAll(actions);
        decryptSecrets(arrayList2);
        arrayList2.forEach(invocableStreamPipesEntity -> {
            try {
                invocableStreamPipesEntity.setSelectedEndpointUrl(findSelectedEndpoint(invocableStreamPipesEntity));
                invocableStreamPipesEntity.setCorrespondingPipeline(this.pipeline.getPipelineId());
            } catch (NoServiceEndpointsAvailableException e) {
                arrayList.add(invocableStreamPipesEntity);
            }
        });
        if (arrayList.size() == 0) {
            pipelineOperationStatus = new GraphSubmitter(this.pipeline.getPipelineId(), this.pipeline.getName(), arrayList2, list).invokeGraphs();
            encryptSecrets(arrayList2);
            if (pipelineOperationStatus.isSuccess()) {
                storeInvocationGraphs(this.pipeline.getPipelineId(), arrayList2, list);
                PipelineStatusManager.addPipelineStatus(this.pipeline.getPipelineId(), new PipelineStatusMessage(this.pipeline.getPipelineId(), System.currentTimeMillis(), PipelineStatusMessageType.PIPELINE_STARTED.title(), PipelineStatusMessageType.PIPELINE_STARTED.description()));
                if (this.storeStatus) {
                    this.pipeline.setHealthStatus(PipelineHealthStatus.OK);
                    setPipelineStarted(this.pipeline);
                }
            }
        } else {
            pipelineOperationStatus = new PipelineOperationStatus(this.pipeline.getPipelineId(), this.pipeline.getName(), "Could not start pipeline " + this.pipeline.getName() + ".", (List) arrayList.stream().map(namedStreamPipesEntity -> {
                return new PipelineElementStatus(namedStreamPipesEntity.getElementId(), namedStreamPipesEntity.getName(), false, "No active supporting service found");
            }).collect(Collectors.toList()));
        }
        return pipelineOperationStatus;
    }

    private String findSelectedEndpoint(InvocableStreamPipesEntity invocableStreamPipesEntity) throws NoServiceEndpointsAvailableException {
        return new ExtensionsServiceEndpointGenerator(invocableStreamPipesEntity.getAppId(), ExtensionsServiceEndpointUtils.getPipelineElementType(invocableStreamPipesEntity)).getEndpointResourceUrl();
    }

    private String findSelectedEndpoint(SpDataSet spDataSet) throws NoServiceEndpointsAvailableException {
        return spDataSet.isInternallyManaged() ? getConnectMasterSourcesUrl() : new ExtensionsServiceEndpointGenerator(spDataSet.getAppId() != null ? spDataSet.getAppId() : spDataSet.getCorrespondingAdapterId(), SpServiceUrlProvider.DATA_SET).getEndpointResourceUrl();
    }

    private String getConnectMasterSourcesUrl() throws NoServiceEndpointsAvailableException {
        List<String> serviceEndpoints = SpServiceDiscovery.getServiceDiscovery().getServiceEndpoints(DefaultSpServiceGroups.CORE, true, Collections.singletonList(DefaultSpServiceTags.CONNECT_MASTER.asString()));
        if (serviceEndpoints.size() > 0) {
            return serviceEndpoints.get(0) + GlobalStreamPipesConstants.CONNECT_MASTER_SOURCES_ENDPOINT;
        }
        throw new NoServiceEndpointsAvailableException("Could not find any available connect master service endpoint");
    }

    private void updateGroupIds(InvocableStreamPipesEntity invocableStreamPipesEntity) {
        Stream<R> map = invocableStreamPipesEntity.getInputStreams().stream().filter(spDataStream -> {
            return spDataStream.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol;
        }).map(spDataStream2 -> {
            return spDataStream2.getEventGrounding().getTransportProtocol();
        });
        Class<KafkaTransportProtocol> cls = KafkaTransportProtocol.class;
        Objects.requireNonNull(KafkaTransportProtocol.class);
        map.map((v1) -> {
            return r1.cast(v1);
        }).forEach(kafkaTransportProtocol -> {
            kafkaTransportProtocol.setGroupId(Utils.filterSpecialChar(this.pipeline.getName()) + MD5.crypt(kafkaTransportProtocol.getElementId()));
        });
    }

    private void decryptSecrets(List<InvocableStreamPipesEntity> list) {
        SecretProvider.getDecryptionService().apply(list);
    }

    private void encryptSecrets(List<InvocableStreamPipesEntity> list) {
        SecretProvider.getEncryptionService().apply(list);
    }

    public PipelineOperationStatus stopPipeline() {
        PipelineOperationStatus detachGraphs = new GraphSubmitter(this.pipeline.getPipelineId(), this.pipeline.getName(), TemporaryGraphStorage.graphStorage.get(this.pipeline.getPipelineId()), TemporaryGraphStorage.datasetStorage.get(this.pipeline.getPipelineId())).detachGraphs();
        if (detachGraphs.isSuccess()) {
            PipelineStatusManager.addPipelineStatus(this.pipeline.getPipelineId(), new PipelineStatusMessage(this.pipeline.getPipelineId(), System.currentTimeMillis(), PipelineStatusMessageType.PIPELINE_STOPPED.title(), PipelineStatusMessageType.PIPELINE_STOPPED.description()));
        }
        if ((detachGraphs.isSuccess() || this.forceStop) && this.storeStatus) {
            setPipelineStopped(this.pipeline);
        }
        return detachGraphs;
    }

    private void setPipelineStarted(Pipeline pipeline) {
        pipeline.setRunning(true);
        pipeline.setStartedAt(new Date().getTime());
        try {
            getPipelineStorageApi().updatePipeline(pipeline);
        } catch (DocumentConflictException e) {
        }
    }

    private void setPipelineStopped(Pipeline pipeline) {
        pipeline.setRunning(false);
        getPipelineStorageApi().updatePipeline(pipeline);
    }

    private void storeInvocationGraphs(String str, List<InvocableStreamPipesEntity> list, List<SpDataSet> list2) {
        TemporaryGraphStorage.graphStorage.put(str, list);
        TemporaryGraphStorage.datasetStorage.put(str, list2);
    }

    private IPipelineStorage getPipelineStorageApi() {
        return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
    }
}
