/*
 * Decompiled with CFR 0.152.
 */
package cn.leancloud.kafka.consumer;

import cn.leancloud.kafka.consumer.CommitPolicy;
import cn.leancloud.kafka.consumer.ConsumerSeekDestination;
import cn.leancloud.kafka.consumer.ProcessRecordsProgress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class RebalanceListener<K, V>
implements ConsumerRebalanceListener {
    private static final Logger logger = LoggerFactory.getLogger(RebalanceListener.class);
    private final CommitPolicy policy;
    private final Consumer<K, V> consumer;
    private final ProcessRecordsProgress progress;
    private final Set<TopicPartition> knownPartitions;
    private final ConsumerSeekDestination forceSeekTo;
    private Set<TopicPartition> pausedPartitions;

    RebalanceListener(Consumer<K, V> consumer, ProcessRecordsProgress progress, CommitPolicy policy, ConsumerSeekDestination forceSeekTo) {
        this.policy = policy;
        this.consumer = consumer;
        this.progress = progress;
        this.pausedPartitions = Collections.emptySet();
        this.knownPartitions = new HashSet<TopicPartition>();
        this.forceSeekTo = forceSeekTo;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        this.pausedPartitions = this.consumer.paused();
        if (!this.pausedPartitions.isEmpty()) {
            this.pausedPartitions = new HashSet<TopicPartition>(this.pausedPartitions);
            this.pausedPartitions.removeAll(this.policy.partialCommitSync(this.progress));
        }
        this.policy.pauseCommit();
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        if (this.forceSeekTo != ConsumerSeekDestination.NONE) {
            this.seekOnAssignedPartitions(partitions);
        }
        this.clearProgressForRevokedPartitions();
        this.policy.resumeCommit();
        Set<TopicPartition> resumedPartitions = this.policy.partialCommitSync(this.progress);
        if (!this.pausedPartitions.isEmpty()) {
            this.pausedPartitions.removeAll(resumedPartitions);
            this.pausePreviousPausedPartitions(partitions);
        }
    }

    private void seekOnAssignedPartitions(Collection<TopicPartition> partitions) {
        HashSet<TopicPartition> newPartitions = new HashSet<TopicPartition>();
        for (TopicPartition p : partitions) {
            if (this.knownPartitions.contains(p)) continue;
            newPartitions.add(p);
            logger.info("Assigned new partition: {}, force seeking it's offset to {}", (Object)p, (Object)this.forceSeekTo);
        }
        if (!newPartitions.isEmpty()) {
            this.forceSeekTo.seek(this.consumer, newPartitions);
            this.knownPartitions.addAll(newPartitions);
        }
    }

    private void pausePreviousPausedPartitions(Collection<TopicPartition> partitions) {
        Set<TopicPartition> partitionToPause = partitions.stream().filter(p -> this.pausedPartitions.contains(p)).collect(Collectors.toSet());
        if (partitionToPause.isEmpty()) {
            logger.info("Previous paused partitions: {} were all revoked", (Object)this.pausedPartitions);
        } else {
            logger.info("Pause previous paused partitions: {}", (Object)partitionToPause);
            this.consumer.pause(partitionToPause);
        }
        this.pausedPartitions = Collections.emptySet();
    }

    private void clearProgressForRevokedPartitions() {
        Set<TopicPartition> partitionsWithProgress = this.progress.allPartitions();
        partitionsWithProgress.removeAll(this.consumer.assignment());
        if (!partitionsWithProgress.isEmpty()) {
            this.progress.clearFor(partitionsWithProgress);
        }
    }
}

