/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.connector.http.polling.task;

import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundIntermediateConnectorContext;
import io.camunda.connector.api.inbound.ProcessInstanceContext;
import io.camunda.connector.http.base.services.HttpService;
import io.camunda.connector.http.polling.model.PollingIntervalConfiguration;
import io.camunda.connector.http.polling.service.SharedExecutorService;
import io.camunda.connector.http.polling.task.HttpRequestTask;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessInstancesFetcherTask
implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ProcessInstancesFetcherTask.class);
    private final InboundIntermediateConnectorContext context;
    private final HttpService httpService;
    private final SharedExecutorService executorService;
    private final PollingIntervalConfiguration config;
    private final ConcurrentHashMap<String, ScheduledFuture<?>> runningHttpRequestTaskIds;

    public ProcessInstancesFetcherTask(InboundIntermediateConnectorContext context, HttpService httpService, SharedExecutorService executorService) {
        this.config = (PollingIntervalConfiguration)context.bindProperties(PollingIntervalConfiguration.class);
        this.context = context;
        this.httpService = httpService;
        this.executorService = executorService;
        this.runningHttpRequestTaskIds = new ConcurrentHashMap();
    }

    @Override
    public void run() {
        try {
            List processInstanceContexts = this.context.getProcessInstanceContexts();
            if (processInstanceContexts != null) {
                this.removeInactiveTasks(processInstanceContexts);
                processInstanceContexts.forEach(this::scheduleRequest);
                this.context.reportHealth(Health.up((String)"Process instances", (Object)processInstanceContexts.size()));
            }
        }
        catch (Exception e) {
            LOGGER.error("An error occurred: {}", (Object)e.getMessage(), (Object)e);
            this.context.reportHealth(Health.down((Throwable)e));
        }
    }

    private void removeInactiveTasks(List<ProcessInstanceContext> processInstanceContexts) {
        List<String> activeTasks = processInstanceContexts.stream().map(this::getRequestTaskKey).toList();
        List<Map.Entry> inactiveTasks = this.runningHttpRequestTaskIds.entrySet().stream().filter(entry -> !activeTasks.contains(entry.getKey())).toList();
        inactiveTasks.forEach(entry -> {
            ((ScheduledFuture)entry.getValue()).cancel(true);
            this.runningHttpRequestTaskIds.remove(entry.getKey());
        });
    }

    private void scheduleRequest(ProcessInstanceContext processInstanceContext) {
        String taskKey = this.getRequestTaskKey(processInstanceContext);
        this.runningHttpRequestTaskIds.computeIfAbsent(taskKey, key -> {
            HttpRequestTask task = new HttpRequestTask(this.httpService, processInstanceContext, this.context);
            return this.executorService.getExecutorService().scheduleWithFixedDelay(task, 0L, this.config.getHttpRequestInterval().toMillis(), TimeUnit.MILLISECONDS);
        });
    }

    private String getRequestTaskKey(ProcessInstanceContext processInstanceContext) {
        return this.context.getDefinition().deduplicationId() + processInstanceContext.getKey();
    }

    public void start() {
        this.executorService.getExecutorService().scheduleWithFixedDelay(this, 0L, this.config.getOperatePollingInterval().toMillis(), TimeUnit.MILLISECONDS);
    }

    public void stop() {
        this.runningHttpRequestTaskIds.values().forEach(task -> task.cancel(true));
    }
}

