/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools.consumer.group;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Utils;

class ConsumerGroupCommandTestUtils {
    private ConsumerGroupCommandTestUtils() {
    }

    static <T> AutoCloseable buildConsumers(int numberOfConsumers, Set<TopicPartition> partitions, Supplier<KafkaConsumer<T, T>> consumerSupplier) {
        return ConsumerGroupCommandTestUtils.buildConsumers(numberOfConsumers, false, consumerSupplier, (KafkaConsumer<T, T> consumer) -> consumer.assign((Collection)partitions));
    }

    static <T> AutoCloseable buildConsumers(int numberOfConsumers, boolean syncCommit, String topic, Supplier<KafkaConsumer<T, T>> consumerSupplier) {
        return ConsumerGroupCommandTestUtils.buildConsumers(numberOfConsumers, syncCommit, consumerSupplier, (KafkaConsumer<T, T> consumer) -> consumer.subscribe(Collections.singleton(topic)));
    }

    static <T> AutoCloseable buildConsumers(int numberOfConsumers, boolean syncCommit, Supplier<KafkaConsumer<T, T>> consumerSupplier, Consumer<KafkaConsumer<T, T>> setPartitions) {
        ArrayList consumers = new ArrayList(numberOfConsumers);
        ExecutorService executor = Executors.newFixedThreadPool(numberOfConsumers);
        AtomicBoolean closed = new AtomicBoolean(false);
        AutoCloseable closeable = () -> ConsumerGroupCommandTestUtils.releaseConsumers(closed, consumers, executor);
        try {
            for (int i = 0; i < numberOfConsumers; ++i) {
                KafkaConsumer consumer = consumerSupplier.get();
                consumers.add(consumer);
                executor.execute(() -> ConsumerGroupCommandTestUtils.initConsumer(syncCommit, () -> {
                    setPartitions.accept(consumer);
                    return consumer;
                }, closed));
            }
            return closeable;
        }
        catch (Throwable e) {
            Utils.closeQuietly((AutoCloseable)closeable, (String)"Release Consumer");
            throw e;
        }
    }

    private static <T> void releaseConsumers(AtomicBoolean closed, List<KafkaConsumer<T, T>> consumers, ExecutorService executor) throws InterruptedException {
        closed.set(true);
        consumers.forEach(KafkaConsumer::wakeup);
        executor.shutdown();
        executor.awaitTermination(1L, TimeUnit.MINUTES);
    }

    private static <T> void initConsumer(boolean syncCommit, Supplier<KafkaConsumer<T, T>> consumerSupplier, AtomicBoolean closed) {
        try (KafkaConsumer<T, T> kafkaConsumer = consumerSupplier.get();){
            while (!closed.get()) {
                kafkaConsumer.poll(Duration.ofMillis(1000L));
                if (!syncCommit) continue;
                kafkaConsumer.commitSync();
            }
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
    }
}

