package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.slf4j.Logger;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.4.0.jar:org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.class */
public class StreamsRebalanceListener implements ConsumerRebalanceListener {
    private final Time time;
    private final TaskManager taskManager;
    private final StreamThread streamThread;
    private final Logger log;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamsRebalanceListener(Time time, TaskManager taskManager, StreamThread streamThread, Logger logger) {
        this.time = time;
        this.taskManager = taskManager;
        this.streamThread = streamThread;
        this.log = logger;
    }

    @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        this.log.debug("Current state {}: assigned partitions {} at the end of consumer rebalance.\n\tpreviously assigned active tasks: {}\n\tpreviously assigned standby tasks: {}\n", this.streamThread.state(), collection, this.taskManager.previousActiveTaskIds(), this.taskManager.previousStandbyTaskIds());
        if (this.streamThread.getAssignmentErrorCode() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
            this.log.error("Received error code {} - shutdown", Integer.valueOf(this.streamThread.getAssignmentErrorCode()));
            this.streamThread.shutdown();
            return;
        }
        long milliseconds = this.time.milliseconds();
        List<TopicPartition> list = null;
        try {
            try {
                if (this.streamThread.setState(StreamThread.State.PARTITIONS_ASSIGNED) == null) {
                    this.log.debug("Skipping task creation in rebalance because we are already in {} state.", this.streamThread.state());
                } else {
                    list = this.taskManager.closeRevokedStandbyTasks();
                    this.taskManager.closeRevokedSuspendedTasks();
                    this.taskManager.createTasks(collection);
                }
                if (list != null) {
                    this.streamThread.clearStandbyRecords(list);
                }
                this.log.info("partition assignment took {} ms.\n\tcurrently assigned active tasks: {}\n\tcurrently assigned standby tasks: {}\n\trevoked active tasks: {}\n\trevoked standby tasks: {}\n", Long.valueOf(this.time.milliseconds() - milliseconds), this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), this.taskManager.revokedActiveTaskIds(), this.taskManager.revokedStandbyTaskIds());
            } catch (Throwable th) {
                this.log.error("Error caught during partition assignment, will abort the current process and re-throw at the end of rebalance", th);
                this.streamThread.setRebalanceException(th);
                if (0 != 0) {
                    this.streamThread.clearStandbyRecords(null);
                }
                this.log.info("partition assignment took {} ms.\n\tcurrently assigned active tasks: {}\n\tcurrently assigned standby tasks: {}\n\trevoked active tasks: {}\n\trevoked standby tasks: {}\n", Long.valueOf(this.time.milliseconds() - milliseconds), this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), this.taskManager.revokedActiveTaskIds(), this.taskManager.revokedStandbyTaskIds());
            }
        } catch (Throwable th2) {
            if (0 != 0) {
                this.streamThread.clearStandbyRecords(null);
            }
            this.log.info("partition assignment took {} ms.\n\tcurrently assigned active tasks: {}\n\tcurrently assigned standby tasks: {}\n\trevoked active tasks: {}\n\trevoked standby tasks: {}\n", Long.valueOf(this.time.milliseconds() - milliseconds), this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), this.taskManager.revokedActiveTaskIds(), this.taskManager.revokedStandbyTaskIds());
            throw th2;
        }
    }

    @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        this.log.debug("Current state {}: revoked partitions {} because of consumer rebalance.\n\tcurrently assigned active tasks: {}\n\tcurrently assigned standby tasks: {}\n", this.streamThread.state(), collection, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds());
        Collection hashSet = new HashSet();
        if (this.streamThread.setState(StreamThread.State.PARTITIONS_REVOKED) == null || collection.isEmpty()) {
            return;
        }
        long milliseconds = this.time.milliseconds();
        try {
            try {
                hashSet = this.taskManager.suspendActiveTasksAndState(collection);
                this.log.info("partition revocation took {} ms.\n\tcurrent suspended active tasks: {}\n", Long.valueOf(this.time.milliseconds() - milliseconds), hashSet);
            } catch (Throwable th) {
                this.log.error("Error caught during partition revocation, will abort the current process and re-throw at the end of rebalance: ", th);
                this.streamThread.setRebalanceException(th);
                this.log.info("partition revocation took {} ms.\n\tcurrent suspended active tasks: {}\n", Long.valueOf(this.time.milliseconds() - milliseconds), hashSet);
            }
        } catch (Throwable th2) {
            this.log.info("partition revocation took {} ms.\n\tcurrent suspended active tasks: {}\n", Long.valueOf(this.time.milliseconds() - milliseconds), hashSet);
            throw th2;
        }
    }

    @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
    public void onPartitionsLost(Collection<TopicPartition> collection) {
        this.log.info("at state {}: partitions {} lost due to missed rebalance.\n\tlost active tasks: {}\n\tlost assigned standby tasks: {}\n", this.streamThread.state(), collection, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds());
        Collection hashSet = new HashSet();
        long milliseconds = this.time.milliseconds();
        try {
            try {
                hashSet = this.taskManager.closeLostTasks();
                this.log.info("partitions lost took {} ms.\n\tclosed lost active tasks: {}\n", Long.valueOf(this.time.milliseconds() - milliseconds), hashSet);
            } catch (Throwable th) {
                this.log.error("Error caught during partitions lost, will abort the current process and re-throw at the end of rebalance: ", th);
                this.streamThread.setRebalanceException(th);
                this.log.info("partitions lost took {} ms.\n\tclosed lost active tasks: {}\n", Long.valueOf(this.time.milliseconds() - milliseconds), hashSet);
            }
        } catch (Throwable th2) {
            this.log.info("partitions lost took {} ms.\n\tclosed lost active tasks: {}\n", Long.valueOf(this.time.milliseconds() - milliseconds), hashSet);
            throw th2;
        }
    }
}
