/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.elasticsearch7.shaded.org.elasticsearch.tasks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchException;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchTimeoutException;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.ExceptionsHelper;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.ActionListener;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.ActionResponse;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.ClusterChangedEvent;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.ClusterStateApplier;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.node.DiscoveryNode;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.node.DiscoveryNodes;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.settings.Settings;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.unit.ByteSizeValue;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.unit.TimeValue;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.ThreadContext;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.http.HttpTransportSettings;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.tasks.CancellableTask;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.tasks.Task;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.tasks.TaskAwareRequest;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.tasks.TaskId;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.tasks.TaskResult;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.tasks.TaskResultsService;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.threadpool.ThreadPool;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;

public class TaskManager
implements ClusterStateApplier {
    private static final Logger logger = LogManager.getLogger(TaskManager.class);
    private static final TimeValue WAIT_FOR_COMPLETION_POLL = TimeValue.timeValueMillis(100L);
    private final List<String> taskHeaders;
    private final ThreadPool threadPool;
    private final ConcurrentMapLong<Task> tasks = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency();
    private final ConcurrentMapLong<CancellableTaskHolder> cancellableTasks = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency();
    private final AtomicLong taskIdGenerator = new AtomicLong();
    private final Map<TaskId, String> banedParents = new ConcurrentHashMap<TaskId, String>();
    private TaskResultsService taskResultsService;
    private DiscoveryNodes lastDiscoveryNodes = DiscoveryNodes.EMPTY_NODES;
    private final ByteSizeValue maxHeaderSize;

    public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders) {
        this.threadPool = threadPool;
        this.taskHeaders = new ArrayList<String>(taskHeaders);
        this.maxHeaderSize = HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
    }

    public void setTaskResultsService(TaskResultsService taskResultsService) {
        assert (this.taskResultsService == null);
        this.taskResultsService = taskResultsService;
    }

    public Task register(String type, String action, TaskAwareRequest request) {
        HashMap<String, String> headers = new HashMap<String, String>();
        long headerSize = 0L;
        long maxSize = this.maxHeaderSize.getBytes();
        ThreadContext threadContext = this.threadPool.getThreadContext();
        for (String key : this.taskHeaders) {
            String httpHeader = threadContext.getHeader(key);
            if (httpHeader == null) continue;
            if ((headerSize += (long)(key.length() * 2 + httpHeader.length() * 2)) > maxSize) {
                throw new IllegalArgumentException("Request exceeded the maximum size of task headers " + this.maxHeaderSize);
            }
            headers.put(key, httpHeader);
        }
        Task task = request.createTask(this.taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers);
        Objects.requireNonNull(task);
        assert (task.getParentTaskId().equals(request.getParentTask())) : "Request [ " + request + "] didn't preserve it parentTaskId";
        if (logger.isTraceEnabled()) {
            logger.trace("register {} [{}] [{}] [{}]", (Object)task.getId(), (Object)type, (Object)action, (Object)task.getDescription());
        }
        if (task instanceof CancellableTask) {
            this.registerCancellableTask(task);
        } else {
            Task previousTask = this.tasks.put(task.getId(), task);
            assert (previousTask == null);
        }
        return task;
    }

    private void registerCancellableTask(Task task) {
        String reason;
        CancellableTask cancellableTask = (CancellableTask)task;
        CancellableTaskHolder holder = new CancellableTaskHolder(cancellableTask);
        CancellableTaskHolder oldHolder = this.cancellableTasks.put(task.getId(), holder);
        assert (oldHolder == null);
        if (task.getParentTaskId().isSet() && !this.banedParents.isEmpty() && (reason = this.banedParents.get(task.getParentTaskId())) != null) {
            try {
                holder.cancel(reason);
                throw new IllegalStateException("Task cancelled before it started: " + reason);
            }
            catch (Throwable throwable) {
                this.unregister(task);
                throw throwable;
            }
        }
    }

    public boolean cancel(CancellableTask task, String reason, Runnable listener) {
        CancellableTaskHolder holder = this.cancellableTasks.get(task.getId());
        if (holder != null) {
            logger.trace("cancelling task with id {}", (Object)task.getId());
            return holder.cancel(reason, listener);
        }
        return false;
    }

    public Task unregister(Task task) {
        logger.trace("unregister task for id: {}", (Object)task.getId());
        if (task instanceof CancellableTask) {
            CancellableTaskHolder holder = this.cancellableTasks.remove(task.getId());
            if (holder != null) {
                holder.finish();
                return holder.getTask();
            }
            return null;
        }
        return this.tasks.remove(task.getId());
    }

    public <Response extends ActionResponse> void storeResult(Task task, final Exception error, final ActionListener<Response> listener) {
        TaskResult taskResult;
        DiscoveryNode localNode = this.lastDiscoveryNodes.getLocalNode();
        if (localNode == null) {
            listener.onFailure(error);
            return;
        }
        try {
            taskResult = task.result(localNode, error);
        }
        catch (IOException ex) {
            logger.warn(() -> new ParameterizedMessage("couldn't store error {}", (Object)ExceptionsHelper.detailedMessage(error)), (Throwable)ex);
            listener.onFailure(ex);
            return;
        }
        this.taskResultsService.storeResult(taskResult, new ActionListener<Void>(){

            @Override
            public void onResponse(Void aVoid) {
                listener.onFailure(error);
            }

            @Override
            public void onFailure(Exception e) {
                logger.warn(() -> new ParameterizedMessage("couldn't store error {}", (Object)ExceptionsHelper.detailedMessage(error)), (Throwable)e);
                listener.onFailure(e);
            }
        });
    }

    public <Response extends ActionResponse> void storeResult(Task task, final Response response, final ActionListener<Response> listener) {
        TaskResult taskResult;
        DiscoveryNode localNode = this.lastDiscoveryNodes.getLocalNode();
        if (localNode == null) {
            logger.warn("couldn't store response {}, the node didn't join the cluster yet", response);
            listener.onResponse(response);
            return;
        }
        try {
            taskResult = task.result(localNode, response);
        }
        catch (IOException ex) {
            logger.warn(() -> new ParameterizedMessage("couldn't store response {}", (Object)response), (Throwable)ex);
            listener.onFailure(ex);
            return;
        }
        this.taskResultsService.storeResult(taskResult, new ActionListener<Void>(){

            @Override
            public void onResponse(Void aVoid) {
                listener.onResponse(response);
            }

            @Override
            public void onFailure(Exception e) {
                logger.warn(() -> new ParameterizedMessage("couldn't store response {}", (Object)response), (Throwable)e);
                listener.onFailure(e);
            }
        });
    }

    public Map<Long, Task> getTasks() {
        HashMap<Long, CancellableTask> taskHashMap = new HashMap<Long, CancellableTask>(this.tasks);
        for (CancellableTaskHolder holder : this.cancellableTasks.values()) {
            taskHashMap.put(holder.getTask().getId(), holder.getTask());
        }
        return Collections.unmodifiableMap(taskHashMap);
    }

    public Map<Long, CancellableTask> getCancellableTasks() {
        HashMap<Long, CancellableTask> taskHashMap = new HashMap<Long, CancellableTask>();
        for (CancellableTaskHolder holder : this.cancellableTasks.values()) {
            taskHashMap.put(holder.getTask().getId(), holder.getTask());
        }
        return Collections.unmodifiableMap(taskHashMap);
    }

    public Task getTask(long id) {
        Task task = this.tasks.get(id);
        if (task != null) {
            return task;
        }
        return this.getCancellableTask(id);
    }

    public CancellableTask getCancellableTask(long id) {
        CancellableTaskHolder holder = this.cancellableTasks.get(id);
        if (holder != null) {
            return holder.getTask();
        }
        return null;
    }

    public int getBanCount() {
        return this.banedParents.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setBan(TaskId parentTaskId, String reason) {
        logger.trace("setting ban for the parent task {} {}", (Object)parentTaskId, (Object)reason);
        Map<TaskId, String> map = this.banedParents;
        synchronized (map) {
            if (this.lastDiscoveryNodes.nodeExists(parentTaskId.getNodeId())) {
                this.banedParents.put(parentTaskId, reason);
            }
        }
        for (Map.Entry entry : this.cancellableTasks.entrySet()) {
            CancellableTaskHolder holder = (CancellableTaskHolder)entry.getValue();
            if (!holder.hasParent(parentTaskId)) continue;
            holder.cancel(reason);
        }
    }

    public void removeBan(TaskId parentTaskId) {
        logger.trace("removing ban for the parent task {}", (Object)parentTaskId);
        this.banedParents.remove(parentTaskId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void applyClusterState(ClusterChangedEvent event) {
        this.lastDiscoveryNodes = event.state().getNodes();
        if (event.nodesRemoved()) {
            Map<TaskId, String> map = this.banedParents;
            synchronized (map) {
                this.lastDiscoveryNodes = event.state().getNodes();
                Iterator<TaskId> iterator = this.banedParents.keySet().iterator();
                while (iterator.hasNext()) {
                    TaskId taskId = iterator.next();
                    if (this.lastDiscoveryNodes.nodeExists(taskId.getNodeId())) continue;
                    logger.debug("Removing ban for the parent [{}] on the node [{}], reason: the parent node is gone", (Object)taskId, (Object)event.state().getNodes().getLocalNode());
                    iterator.remove();
                }
            }
            for (Map.Entry entry : this.cancellableTasks.entrySet()) {
                CancellableTaskHolder holder = (CancellableTaskHolder)entry.getValue();
                CancellableTask task = holder.getTask();
                TaskId parentTaskId = task.getParentTaskId();
                if (!parentTaskId.isSet() || this.lastDiscoveryNodes.nodeExists(parentTaskId.getNodeId()) || !task.cancelOnParentLeaving()) continue;
                holder.cancel("Coordinating node [" + parentTaskId.getNodeId() + "] left the cluster");
            }
        }
    }

    public void waitForTaskCompletion(Task task, long untilInNanos) {
        while (System.nanoTime() - untilInNanos < 0L) {
            if (this.getTask(task.getId()) == null) {
                return;
            }
            try {
                Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis());
            }
            catch (InterruptedException e) {
                throw new ElasticsearchException("Interrupted waiting for completion of [{}]", (Throwable)e, task);
            }
        }
        throw new ElasticsearchTimeoutException("Timed out waiting for completion of [{}]", task);
    }

    private static class CancellableTaskHolder {
        private static final String TASK_FINISHED_MARKER = "task finished";
        private final CancellableTask task;
        private volatile String cancellationReason = null;
        private volatile Runnable cancellationListener = null;

        CancellableTaskHolder(CancellableTask task) {
            this.task = task;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean cancel(String reason, Runnable listener) {
            boolean cancelled;
            CancellableTaskHolder cancellableTaskHolder = this;
            synchronized (cancellableTaskHolder) {
                assert (reason != null);
                if (this.cancellationReason == null) {
                    this.cancellationReason = reason;
                    this.cancellationListener = listener;
                    cancelled = true;
                } else {
                    cancelled = false;
                }
            }
            if (cancelled) {
                this.task.cancel(reason);
            }
            return cancelled;
        }

        public boolean cancel(String reason) {
            return this.cancel(reason, null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void finish() {
            Runnable listener = null;
            CancellableTaskHolder cancellableTaskHolder = this;
            synchronized (cancellableTaskHolder) {
                if (this.cancellationReason != null) {
                    if (this.cancellationListener != null) {
                        listener = this.cancellationListener;
                        this.cancellationListener = null;
                    }
                } else {
                    this.cancellationReason = TASK_FINISHED_MARKER;
                }
            }
            if (listener != null) {
                listener.run();
            }
        }

        public boolean hasParent(TaskId parentTaskId) {
            return this.task.getParentTaskId().equals(parentTaskId);
        }

        public CancellableTask getTask() {
            return this.task;
        }
    }
}

