/*
 * Decompiled with CFR 0.152.
 */
package cn.leancloud.kafka.consumer;

import cn.leancloud.kafka.consumer.CommitPolicy;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class RebalanceListener<K, V>
implements ConsumerRebalanceListener {
    private static final Logger logger = LoggerFactory.getLogger(RebalanceListener.class);
    private final CommitPolicy<K, V> policy;
    private final Consumer<K, V> consumer;
    private Set<TopicPartition> pausedPartitions;

    RebalanceListener(Consumer<K, V> consumer, CommitPolicy<K, V> policy) {
        this.policy = policy;
        this.consumer = consumer;
        this.pausedPartitions = Collections.emptySet();
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        logger.info("Partitions was revoked {}", partitions);
        this.pausedPartitions = this.consumer.paused();
        if (!this.pausedPartitions.isEmpty()) {
            this.pausedPartitions = new HashSet<TopicPartition>(this.pausedPartitions);
            this.pausedPartitions.removeAll(this.policy.partialCommit());
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        logger.info("Partitions was assigned {}", partitions);
        if (!this.pausedPartitions.isEmpty()) {
            Set partitionToPause = partitions.stream().filter(p -> this.pausedPartitions.contains(p)).collect(Collectors.toSet());
            if (partitionToPause.isEmpty()) {
                logger.info("Previous paused partitions: {} were all revoked", this.pausedPartitions);
            } else {
                logger.info("Pause previous paused partitions: {}", partitionToPause);
                this.consumer.pause(partitionToPause);
            }
            this.pausedPartitions = Collections.emptySet();
        }
    }
}

