/*
 * Decompiled with CFR 0.152.
 */
package io.castled.kafka.consumer;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.castled.kafka.consumer.BaseKafkaConsumer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.errors.InterruptException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerUtils {
    private static final Logger log = LoggerFactory.getLogger(ConsumerUtils.class);

    public static void runKafkaConsumer(int consumerCount, String name, BaseKafkaConsumer baseKafkaConsumer) {
        ExecutorService executorService = Executors.newFixedThreadPool(consumerCount, new ThreadFactoryBuilder().setNameFormat(name + "-consumer-%d").build());
        for (int i = 0; i < consumerCount; ++i) {
            executorService.submit(() -> {
                try {
                    baseKafkaConsumer.run();
                }
                catch (InterruptException e) {
                    log.warn("Consumer {} interrupted", (Object)name);
                }
                catch (Exception e) {
                    log.error("Consumer {} failed to run", (Object)name, (Object)e);
                }
            });
        }
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                executorService.shutdown();
                if (!executorService.awaitTermination(10L, TimeUnit.SECONDS)) {
                    executorService.shutdownNow();
                }
            }
            catch (Exception e) {
                executorService.shutdownNow();
            }
        }));
    }
}

