package org.apache.rya.periodic.notification.registration.kafka;

import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.log4j.Logger;
import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
import org.apache.rya.periodic.notification.notification.CommandNotification;

/* loaded from: input_file:org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.class */
public class PeriodicNotificationConsumer implements Runnable {
    private KafkaConsumer<String, CommandNotification> consumer;
    private int m_threadNumber;
    private String topic;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private NotificationCoordinatorExecutor coord;
    private static final Logger LOG = Logger.getLogger(PeriodicNotificationConsumer.class);

    public PeriodicNotificationConsumer(String str, KafkaConsumer<String, CommandNotification> kafkaConsumer, int i, NotificationCoordinatorExecutor notificationCoordinatorExecutor) {
        this.topic = str;
        this.m_threadNumber = i;
        this.consumer = kafkaConsumer;
        this.coord = notificationCoordinatorExecutor;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                LOG.info("Creating kafka stream for consumer:" + this.m_threadNumber);
                this.consumer.subscribe(Arrays.asList(this.topic));
                while (!this.closed.get()) {
                    Iterator it = this.consumer.poll(10000L).iterator();
                    while (it.hasNext()) {
                        CommandNotification commandNotification = (CommandNotification) ((ConsumerRecord) it.next()).value();
                        LOG.info("Thread " + this.m_threadNumber + " is adding notification " + commandNotification + " to queue.");
                        LOG.info("Message: " + commandNotification);
                        this.coord.processNextCommandNotification(commandNotification);
                    }
                }
                this.consumer.close();
            } catch (WakeupException e) {
                if (!this.closed.get()) {
                    throw e;
                }
                this.consumer.close();
            }
        } catch (Throwable th) {
            this.consumer.close();
            throw th;
        }
    }

    public void shutdown() {
        this.closed.set(true);
        this.consumer.wakeup();
    }
}
