package org.apache.streampipes.manager.health;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.streampipes.commons.constants.InstanceIdExtractor;
import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
import org.apache.streampipes.manager.execution.http.InvokeHttpRequest;
import org.apache.streampipes.manager.storage.RunningPipelineElementStorage;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineHealthStatus;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/streampipes-pipeline-management-0.91.0.jar:org/apache/streampipes/manager/health/PipelineHealthCheck.class */
public class PipelineHealthCheck implements Runnable {
    private static final int MAX_FAILED_ATTEMPTS = 10;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) PipelineHealthCheck.class);
    private static final Map<String, Integer> failedRestartAttempts = new HashMap();

    public void checkAndRestorePipelineElements() {
        List<Pipeline> runningPipelines = getRunningPipelines();
        if (runningPipelines.size() > 0) {
            List<String> findRunningInstances = findRunningInstances(generateEndpointMap().keySet());
            runningPipelines.forEach(pipeline -> {
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                ArrayList arrayList3 = new ArrayList();
                RunningPipelineElementStorage.runningProcessorsAndSinks.get(pipeline.getPipelineId()).forEach(invocableStreamPipesEntity -> {
                    boolean z;
                    String extractInstanceId = extractInstanceId(invocableStreamPipesEntity);
                    if (findRunningInstances.stream().noneMatch(str -> {
                        return str.equals(extractInstanceId);
                    }) && shouldRetry(extractInstanceId)) {
                        String selectedEndpointUrl = invocableStreamPipesEntity.getSelectedEndpointUrl();
                        atomicBoolean.set(true);
                        try {
                            selectedEndpointUrl = findEndpointUrl(invocableStreamPipesEntity);
                            z = new InvokeHttpRequest().execute(invocableStreamPipesEntity, selectedEndpointUrl, pipeline.getPipelineId()).isSuccess();
                        } catch (NoServiceEndpointsAvailableException e) {
                            z = false;
                        }
                        if (!z) {
                            arrayList.add(extractInstanceId);
                            addFailedAttemptNotification(arrayList3, invocableStreamPipesEntity);
                            increaseFailedAttempt(extractInstanceId);
                            LOG.info("Could not restore pipeline element {} of pipeline {} ({}/{})", invocableStreamPipesEntity.getName(), pipeline.getName(), failedRestartAttempts.get(extractInstanceId), 10);
                            return;
                        }
                        arrayList2.add(extractInstanceId);
                        addSuccessfulRestoreNotification(arrayList3, invocableStreamPipesEntity);
                        resetFailedAttempts(extractInstanceId);
                        invocableStreamPipesEntity.setSelectedEndpointUrl(selectedEndpointUrl);
                        LOG.info("Successfully restored pipeline element {} of pipeline {}", invocableStreamPipesEntity.getName(), pipeline.getName());
                    }
                });
                if (atomicBoolean.get()) {
                    if (arrayList.size() > 0) {
                        pipeline.setHealthStatus(PipelineHealthStatus.FAILURE);
                    } else if (arrayList2.size() > 0) {
                        pipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
                    }
                    pipeline.setPipelineNotifications(arrayList3);
                    StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
                }
            });
        }
    }

    private String findEndpointUrl(InvocableStreamPipesEntity invocableStreamPipesEntity) throws NoServiceEndpointsAvailableException {
        return new ExtensionsServiceEndpointGenerator(invocableStreamPipesEntity.getAppId(), ExtensionsServiceEndpointUtils.getPipelineElementType(invocableStreamPipesEntity)).getEndpointResourceUrl();
    }

    private boolean shouldRetry(String str) {
        return !failedRestartAttempts.containsKey(str) || failedRestartAttempts.get(str).intValue() < 10;
    }

    private void resetFailedAttempts(String str) {
        failedRestartAttempts.put(str, 0);
    }

    private void increaseFailedAttempt(String str) {
        if (!failedRestartAttempts.containsKey(str)) {
            failedRestartAttempts.put(str, 1);
        } else {
            failedRestartAttempts.put(str, Integer.valueOf(failedRestartAttempts.get(str).intValue() + 1));
        }
    }

    private void addSuccessfulRestoreNotification(List<String> list, InvocableStreamPipesEntity invocableStreamPipesEntity) {
        list.add(getCurrentDatetime() + "Pipeline element '" + invocableStreamPipesEntity.getName() + "' was not available and was successfully restored.");
    }

    private void addFailedAttemptNotification(List<String> list, InvocableStreamPipesEntity invocableStreamPipesEntity) {
        list.add(getCurrentDatetime() + "Pipeline element '" + invocableStreamPipesEntity.getName() + "' was not available and could not be restored.");
    }

    private String getCurrentDatetime() {
        return "[" + DateTimeFormatter.ofPattern("uuuu/MM/dd HH:mm:ss").format(LocalDateTime.now()) + "] ";
    }

    private String extractInstanceId(InvocableStreamPipesEntity invocableStreamPipesEntity) {
        return InstanceIdExtractor.extractId(invocableStreamPipesEntity.getElementId());
    }

    private List<String> findRunningInstances(Set<String> set) {
        ArrayList arrayList = new ArrayList();
        set.forEach(str -> {
            try {
                arrayList.addAll(new PipelineElementEndpointHealthCheck(str).checkRunningInstances());
            } catch (IOException e) {
                LOG.error("Pipeline element endpoint {} is unavailable", str);
            }
        });
        return arrayList;
    }

    private Map<String, List<InvocableStreamPipesEntity>> generateEndpointMap() {
        HashMap hashMap = new HashMap();
        RunningPipelineElementStorage.runningProcessorsAndSinks.forEach((str, list) -> {
            list.forEach(invocableStreamPipesEntity -> {
                addEndpoint(hashMap, invocableStreamPipesEntity);
            });
        });
        return hashMap;
    }

    private void addEndpoint(Map<String, List<InvocableStreamPipesEntity>> map, InvocableStreamPipesEntity invocableStreamPipesEntity) {
        String selectedEndpointUrl = invocableStreamPipesEntity.getSelectedEndpointUrl();
        if (!map.containsKey(selectedEndpointUrl)) {
            map.put(selectedEndpointUrl, new ArrayList());
        }
        List<InvocableStreamPipesEntity> list = map.get(selectedEndpointUrl);
        list.add(invocableStreamPipesEntity);
        map.put(selectedEndpointUrl, list);
    }

    @Override // java.lang.Runnable
    public void run() {
        checkAndRestorePipelineElements();
    }

    private List<Pipeline> getRunningPipelines() {
        return (List) StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getAllPipelines().stream().filter((v0) -> {
            return v0.isRunning();
        }).collect(Collectors.toList());
    }
}
