/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AssignedTasks;
import org.apache.kafka.streams.processor.internals.RestoringTasks;
import org.apache.kafka.streams.processor.internals.StreamTask;

class AssignedStreamsTasks
extends AssignedTasks<StreamTask>
implements RestoringTasks {
    private final Map<TaskId, StreamTask> suspended = new ConcurrentHashMap<TaskId, StreamTask>();
    private final Map<TaskId, StreamTask> restoring = new ConcurrentHashMap<TaskId, StreamTask>();
    private final Set<TopicPartition> restoredPartitions = new HashSet<TopicPartition>();
    private final Map<TopicPartition, StreamTask> restoringByPartition = new HashMap<TopicPartition, StreamTask>();
    private final Set<TaskId> prevActiveTasks = new HashSet<TaskId>();

    AssignedStreamsTasks(LogContext logContext) {
        super(logContext, "stream task");
    }

    @Override
    public StreamTask restoringTaskFor(TopicPartition partition) {
        return this.restoringByPartition.get(partition);
    }

    @Override
    List<StreamTask> allTasks() {
        List<StreamTask> tasks = super.allTasks();
        tasks.addAll(this.restoring.values());
        tasks.addAll(this.suspended.values());
        return tasks;
    }

    @Override
    Set<TaskId> allAssignedTaskIds() {
        Set<TaskId> taskIds = super.allAssignedTaskIds();
        taskIds.addAll(this.restoring.keySet());
        taskIds.addAll(this.suspended.keySet());
        return taskIds;
    }

    @Override
    boolean allTasksRunning() {
        return super.allTasksRunning() && this.restoring.isEmpty() && (this.suspended.isEmpty() || !this.running.isEmpty());
    }

    @Override
    void closeTask(StreamTask task, boolean clean) {
        if (this.suspended.containsKey(task.id())) {
            task.closeSuspended(clean, null);
        } else {
            task.close(clean, false);
        }
    }

    boolean hasRestoringTasks() {
        return !this.restoring.isEmpty();
    }

    void clearRestoringPartitions() {
        if (!this.restoring.isEmpty()) {
            this.log.error("Tried to clear restoring partitions but was still restoring the stream tasks {}", this.restoring);
            throw new IllegalStateException("Should not clear restoring partitions while set of restoring tasks is non-empty");
        }
        this.restoredPartitions.clear();
        this.restoringByPartition.clear();
    }

    Set<TaskId> suspendedTaskIds() {
        return this.suspended.keySet();
    }

    Set<TaskId> previousRunningTaskIds() {
        return this.prevActiveTasks;
    }

    RuntimeException suspendOrCloseTasks(Set<TaskId> revokedTasks, List<TopicPartition> revokedTaskChangelogs) {
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        HashSet<TaskId> revokedRunningTasks = new HashSet<TaskId>();
        HashSet<TaskId> revokedNonRunningTasks = new HashSet<TaskId>();
        HashSet<TaskId> revokedRestoringTasks = new HashSet<TaskId>();
        this.prevActiveTasks.clear();
        this.prevActiveTasks.addAll(this.runningTaskIds());
        for (TaskId task : revokedTasks) {
            if (this.running.containsKey(task)) {
                revokedRunningTasks.add(task);
                continue;
            }
            if (this.created.containsKey(task)) {
                revokedNonRunningTasks.add(task);
                continue;
            }
            if (this.restoring.containsKey(task)) {
                revokedRestoringTasks.add(task);
                continue;
            }
            if (this.suspended.containsKey(task)) continue;
            this.log.warn("Stream task {} was revoked but cannot be found in the assignment, may have been closed due to error", (Object)task);
        }
        firstException.compareAndSet(null, this.suspendRunningTasks(revokedRunningTasks, revokedTaskChangelogs));
        firstException.compareAndSet(null, this.closeNonRunningTasks(revokedNonRunningTasks, revokedTaskChangelogs));
        firstException.compareAndSet(null, this.closeRestoringTasks(revokedRestoringTasks, revokedTaskChangelogs));
        return firstException.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private RuntimeException suspendRunningTasks(Set<TaskId> runningTasksToSuspend, List<TopicPartition> taskChangelogs) {
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        this.log.debug("Suspending the running stream tasks {}", this.running.keySet());
        for (TaskId id : runningTasksToSuspend) {
            StreamTask task = (StreamTask)this.running.get(id);
            try {
                task.suspend();
                this.suspended.put(id, task);
            }
            catch (TaskMigratedException closeAsZombieAndSwallow) {
                this.log.info("Failed to suspend stream task {} since it got migrated to another thread already. Closing it as zombie and moving on.", (Object)id);
                this.tryCloseZombieTask(task);
                this.prevActiveTasks.remove(id);
            }
            catch (RuntimeException e) {
                this.log.error("Suspending stream task {} failed due to the following error:", (Object)id, (Object)e);
                firstException.compareAndSet(null, e);
                try {
                    this.prevActiveTasks.remove(id);
                    task.close(false, false);
                }
                catch (RuntimeException f) {
                    this.log.error("After suspending failed, closing the same stream task {} failed again due to the following error:", (Object)id, (Object)f);
                }
            }
            finally {
                this.removeTaskFromAllStateMaps(task, this.suspended);
                taskChangelogs.addAll(task.changelogPartitions());
            }
        }
        this.log.trace("Successfully suspended the running stream task {}", this.suspended.keySet());
        return firstException.get();
    }

    private RuntimeException closeNonRunningTasks(Set<TaskId> nonRunningTasksToClose, List<TopicPartition> closedTaskChangelogs) {
        this.log.debug("Closing the created but not initialized stream tasks {}", nonRunningTasksToClose);
        AtomicReference<RuntimeException> firstException = new AtomicReference<RuntimeException>();
        for (TaskId id : nonRunningTasksToClose) {
            StreamTask task = (StreamTask)this.created.get(id);
            firstException.compareAndSet(null, this.closeNonRunning(false, task, closedTaskChangelogs));
        }
        return (RuntimeException)firstException.get();
    }

    RuntimeException closeRestoringTasks(Set<TaskId> restoringTasksToClose, List<TopicPartition> closedTaskChangelogs) {
        this.log.debug("Closing restoring stream tasks {}", restoringTasksToClose);
        AtomicReference<RuntimeException> firstException = new AtomicReference<RuntimeException>();
        for (TaskId id : restoringTasksToClose) {
            StreamTask task = this.restoring.get(id);
            firstException.compareAndSet(null, this.closeRestoring(false, task, closedTaskChangelogs));
        }
        return (RuntimeException)firstException.get();
    }

    private RuntimeException closeRunning(boolean isZombie, StreamTask task) {
        this.removeTaskFromAllStateMaps(task, Collections.emptyMap());
        try {
            boolean clean = !isZombie;
            task.close(clean, isZombie);
        }
        catch (RuntimeException e) {
            this.log.error("Failed to close the stream task {}", (Object)task.id(), (Object)e);
            return e;
        }
        return null;
    }

    private RuntimeException closeNonRunning(boolean isZombie, StreamTask task, List<TopicPartition> closedTaskChangelogs) {
        this.removeTaskFromAllStateMaps(task, Collections.emptyMap());
        closedTaskChangelogs.addAll(task.changelogPartitions());
        try {
            task.close(false, isZombie);
        }
        catch (RuntimeException e) {
            this.log.error("Failed to close the stream task {}", (Object)task.id(), (Object)e);
            return e;
        }
        return null;
    }

    private RuntimeException closeRestoring(boolean isZombie, StreamTask task, List<TopicPartition> closedTaskChangelogs) {
        this.removeTaskFromAllStateMaps(task, Collections.emptyMap());
        closedTaskChangelogs.addAll(task.changelogPartitions());
        try {
            boolean clean = !isZombie;
            task.closeStateManager(clean);
        }
        catch (RuntimeException e) {
            this.log.error("Failed to close the restoring stream task {} due to the following error:", (Object)task.id(), (Object)e);
            return e;
        }
        return null;
    }

    private RuntimeException closeSuspended(boolean isZombie, StreamTask task) {
        this.removeTaskFromAllStateMaps(task, Collections.emptyMap());
        try {
            boolean clean = !isZombie;
            task.closeSuspended(clean, null);
        }
        catch (RuntimeException e) {
            this.log.error("Failed to close the suspended stream task {} due to the following error:", (Object)task.id(), (Object)e);
            return e;
        }
        return null;
    }

    RuntimeException closeNotAssignedSuspendedTasks(Set<TaskId> revokedTasks) {
        this.log.debug("Closing the revoked active stream tasks {}", revokedTasks);
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        for (TaskId revokedTask : revokedTasks) {
            StreamTask suspendedTask = this.suspended.get(revokedTask);
            if (suspendedTask != null) {
                firstException.compareAndSet(null, this.closeSuspended(false, suspendedTask));
                continue;
            }
            this.log.debug("Revoked stream task {} could not be found in suspended, may have already been closed", (Object)revokedTask);
        }
        return firstException.get();
    }

    RuntimeException closeAllTasksAsZombies() {
        this.log.debug("Closing all active tasks as zombies, current state of active tasks: {}", (Object)this.toString());
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        ArrayList<TopicPartition> changelogs = new ArrayList<TopicPartition>();
        for (TaskId id : this.allAssignedTaskIds()) {
            if (this.running.containsKey(id)) {
                this.log.debug("Closing the zombie running stream task {}.", (Object)id);
                firstException.compareAndSet(null, this.closeRunning(true, (StreamTask)this.running.get(id)));
                continue;
            }
            if (this.created.containsKey(id)) {
                this.log.debug("Closing the zombie created stream task {}.", (Object)id);
                firstException.compareAndSet(null, this.closeNonRunning(true, (StreamTask)this.created.get(id), changelogs));
                continue;
            }
            if (this.restoring.containsKey(id)) {
                this.log.debug("Closing the zombie restoring stream task {}.", (Object)id);
                firstException.compareAndSet(null, this.closeRestoring(true, this.restoring.get(id), changelogs));
                continue;
            }
            if (!this.suspended.containsKey(id)) continue;
            this.log.debug("Closing the zombie suspended stream task {}.", (Object)id);
            firstException.compareAndSet(null, this.closeSuspended(true, this.suspended.get(id)));
        }
        this.clear();
        return firstException.get();
    }

    boolean maybeResumeSuspendedTask(TaskId taskId, Set<TopicPartition> partitions) {
        if (this.suspended.containsKey(taskId)) {
            StreamTask task = this.suspended.get(taskId);
            this.log.trace("Found suspended stream task {}", (Object)taskId);
            this.removeTaskFromAllStateMaps(task, Collections.emptyMap());
            if (task.partitions().equals(partitions)) {
                task.resume();
                try {
                    this.transitionToRunning(task);
                }
                catch (TaskMigratedException e) {
                    this.log.info("Failed to resume stream task {} since it got migrated to another thread already. Will trigger a new rebalance and close all tasks as zombies together.", (Object)task.id());
                    throw e;
                }
                this.log.trace("Resuming the suspended stream task {}", (Object)task.id());
                return true;
            }
            this.log.warn("Couldn't resume stream task {} assigned partitions {}, task partitions {}", new Object[]{taskId, partitions, task.partitions()});
            task.closeSuspended(true, null);
        }
        return false;
    }

    void updateRestored(Collection<TopicPartition> restored) {
        if (restored.isEmpty()) {
            return;
        }
        this.log.trace("Stream task changelog partitions that have completed restoring so far: {}", restored);
        this.restoredPartitions.addAll(restored);
        Iterator<Map.Entry<TaskId, StreamTask>> it = this.restoring.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<TaskId, StreamTask> entry = it.next();
            StreamTask task = entry.getValue();
            if (this.restoredPartitions.containsAll(task.changelogPartitions())) {
                this.transitionToRunning(task);
                it.remove();
                this.removeFromRestoredPartitions(task);
                this.removeFromRestoringByPartition(task);
                this.log.debug("Stream task {} completed restoration as all its changelog partitions {} have been applied to restore state", (Object)task.id(), task.changelogPartitions());
                continue;
            }
            if (!this.log.isTraceEnabled()) continue;
            HashSet<TopicPartition> outstandingPartitions = new HashSet<TopicPartition>(task.changelogPartitions());
            outstandingPartitions.removeAll(this.restoredPartitions);
            this.log.trace("Stream task {} cannot resume processing yet since some of its changelog partitions have not completed restoring: {}", (Object)task.id(), outstandingPartitions);
        }
        if (this.allTasksRunning()) {
            this.restoredPartitions.clear();
            if (!this.restoringByPartition.isEmpty()) {
                this.log.error("Finished restoring all tasks but found leftover partitions in restoringByPartition: {}", this.restoringByPartition);
                throw new IllegalStateException("Restoration is complete but not all partitions were cleared.");
            }
        }
    }

    @Override
    void removeTaskFromAllStateMaps(StreamTask task, Map<TaskId, StreamTask> currentStateMap) {
        super.removeTaskFromAllStateMaps(task, currentStateMap);
        TaskId id = task.id();
        HashSet<TopicPartition> taskPartitions = new HashSet<TopicPartition>(task.partitions());
        taskPartitions.addAll(task.changelogPartitions());
        if (currentStateMap != this.restoring) {
            this.restoring.remove(id);
            this.restoringByPartition.keySet().removeAll(taskPartitions);
            this.restoredPartitions.removeAll(taskPartitions);
        }
        if (currentStateMap != this.suspended) {
            this.suspended.remove(id);
        }
    }

    void addTaskToRestoring(StreamTask task) {
        this.restoring.put(task.id(), task);
        for (TopicPartition topicPartition : task.partitions()) {
            this.restoringByPartition.put(topicPartition, task);
        }
        for (TopicPartition topicPartition : task.changelogPartitions()) {
            this.restoringByPartition.put(topicPartition, task);
        }
    }

    private void removeFromRestoringByPartition(StreamTask task) {
        this.restoringByPartition.keySet().removeAll(task.partitions());
        this.restoringByPartition.keySet().removeAll(task.changelogPartitions());
    }

    private void removeFromRestoredPartitions(StreamTask task) {
        this.restoredPartitions.removeAll(task.partitions());
        this.restoredPartitions.removeAll(task.changelogPartitions());
    }

    int maybeCommitPerUserRequested() {
        int committed = 0;
        RuntimeException firstException = null;
        for (StreamTask task : this.running.values()) {
            try {
                if (!task.commitRequested() || !task.commitNeeded()) continue;
                task.commit();
                ++committed;
                this.log.debug("Committed stream task {} per user request in", (Object)task.id());
            }
            catch (TaskMigratedException e) {
                this.log.info("Failed to commit stream task {} since it got migrated to another thread already. Will trigger a new rebalance and close all tasks as zombies together.", (Object)task.id());
                throw e;
            }
            catch (RuntimeException t) {
                this.log.error("Failed to commit stream task {} due to the following error:", (Object)task.id(), (Object)t);
                if (firstException != null) continue;
                firstException = t;
            }
        }
        if (firstException != null) {
            throw firstException;
        }
        return committed;
    }

    Map<TopicPartition, Long> recordsToDelete() {
        HashMap<TopicPartition, Long> recordsToDelete = new HashMap<TopicPartition, Long>();
        for (StreamTask task : this.running.values()) {
            recordsToDelete.putAll(task.purgableOffsets());
        }
        return recordsToDelete;
    }

    int process(long now) {
        int processed = 0;
        for (StreamTask task : this.running.values()) {
            try {
                if (!task.isProcessable(now) || !task.process()) continue;
                ++processed;
            }
            catch (TaskMigratedException e) {
                this.log.info("Failed to process stream task {} since it got migrated to another thread already. Will trigger a new rebalance and close all tasks as zombies together.", (Object)task.id());
                throw e;
            }
            catch (RuntimeException e) {
                this.log.error("Failed to process stream task {} due to the following error:", (Object)task.id(), (Object)e);
                throw e;
            }
        }
        return processed;
    }

    int punctuate() {
        int punctuated = 0;
        for (StreamTask task : this.running.values()) {
            try {
                if (task.maybePunctuateStreamTime()) {
                    ++punctuated;
                }
                if (!task.maybePunctuateSystemTime()) continue;
                ++punctuated;
            }
            catch (TaskMigratedException e) {
                this.log.info("Failed to punctuate stream task {} since it got migrated to another thread already. Will trigger a new rebalance and close all tasks as zombies together.", (Object)task.id());
                throw e;
            }
            catch (KafkaException e) {
                this.log.error("Failed to punctuate stream task {} due to the following error:", (Object)task.id(), (Object)e);
                throw e;
            }
        }
        return punctuated;
    }

    @Override
    void clear() {
        super.clear();
        this.restoring.clear();
        this.restoringByPartition.clear();
        this.restoredPartitions.clear();
        this.suspended.clear();
        this.prevActiveTasks.clear();
    }

    @Override
    public void shutdown(boolean clean) {
        String shutdownType = clean ? "Clean" : "Unclean";
        this.log.debug("{} shutdown of all active tasks\nnon-initialized stream tasks to close: {}\nrestoring tasks to close: {}\nrunning stream tasks to close: {}\nsuspended stream tasks to close: {}", new Object[]{shutdownType, this.created.keySet(), this.restoring.keySet(), this.running.keySet(), this.suspended.keySet()});
        super.shutdown(clean);
    }

    @Override
    public String toString(String indent) {
        StringBuilder builder = new StringBuilder();
        builder.append(super.toString(indent));
        this.describeTasks(builder, this.restoring.values(), indent, "Restoring:");
        this.describePartitions(builder, this.restoringByPartition.keySet(), indent, "Restoring Partitions:");
        this.describePartitions(builder, this.restoredPartitions, indent, "Restored Partitions:");
        this.describeTasks(builder, this.suspended.values(), indent, "Suspended:");
        return builder.toString();
    }

    Collection<StreamTask> restoringTasks() {
        return Collections.unmodifiableCollection(this.restoring.values());
    }

    Set<TaskId> restoringTaskIds() {
        return new HashSet<TaskId>(this.restoring.keySet());
    }
}

