package org.apache.kafka.streams.processor.internals;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.class */
public class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements RestoringTasks {
    private final Logger log;
    private final TaskAction<StreamTask> maybeCommitAction;
    private int committed;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AssignedStreamsTasks(LogContext logContext) {
        super(logContext, "stream task");
        this.committed = 0;
        this.log = logContext.logger(getClass());
        this.maybeCommitAction = new TaskAction<StreamTask>() { // from class: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.1
            @Override // org.apache.kafka.streams.processor.internals.TaskAction
            public String name() {
                return "maybeCommit";
            }

            @Override // org.apache.kafka.streams.processor.internals.TaskAction
            public void apply(StreamTask streamTask) {
                if (streamTask.commitNeeded()) {
                    AssignedStreamsTasks.access$008(AssignedStreamsTasks.this);
                    streamTask.commit();
                    AssignedStreamsTasks.this.log.debug("Committed active task {} per user request in", streamTask.id());
                }
            }
        };
    }

    @Override // org.apache.kafka.streams.processor.internals.RestoringTasks
    public StreamTask restoringTaskFor(TopicPartition topicPartition) {
        return (StreamTask) this.restoringByPartition.get(topicPartition);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int maybeCommit() {
        this.committed = 0;
        applyToRunningTasks(this.maybeCommitAction);
        return this.committed;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TopicPartition, Long> recordsToDelete() {
        HashMap hashMap = new HashMap();
        Iterator it = this.running.values().iterator();
        while (it.hasNext()) {
            hashMap.putAll(((StreamTask) it.next()).purgableOffsets());
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int process() {
        int i = 0;
        Iterator it = this.running.entrySet().iterator();
        while (it.hasNext()) {
            StreamTask streamTask = (StreamTask) ((Map.Entry) it.next()).getValue();
            try {
                if (streamTask.process()) {
                    i++;
                }
            } catch (TaskMigratedException e) {
                RuntimeException closeZombieTask = closeZombieTask(streamTask);
                if (closeZombieTask != null) {
                    throw closeZombieTask;
                }
                it.remove();
                throw e;
            } catch (RuntimeException e2) {
                this.log.error("Failed to process stream task {} due to the following error:", streamTask.id(), e2);
                throw e2;
            }
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int punctuate() {
        int i = 0;
        Iterator it = this.running.entrySet().iterator();
        while (it.hasNext()) {
            StreamTask streamTask = (StreamTask) ((Map.Entry) it.next()).getValue();
            try {
                if (streamTask.maybePunctuateStreamTime()) {
                    i++;
                }
                if (streamTask.maybePunctuateSystemTime()) {
                    i++;
                }
            } catch (TaskMigratedException e) {
                RuntimeException closeZombieTask = closeZombieTask(streamTask);
                if (closeZombieTask != null) {
                    throw closeZombieTask;
                }
                it.remove();
                throw e;
            } catch (KafkaException e2) {
                this.log.error("Failed to punctuate stream task {} due to the following error:", streamTask.id(), e2);
                throw e2;
            }
        }
        return i;
    }

    static /* synthetic */ int access$008(AssignedStreamsTasks assignedStreamsTasks) {
        int i = assignedStreamsTasks.committed;
        assignedStreamsTasks.committed = i + 1;
        return i;
    }
}
