/*
 * Decompiled with CFR 0.152.
 */
package io.taktx.client;

import io.taktx.Topics;
import io.taktx.client.UserTaskTriggerConsumer;
import io.taktx.client.serdes.UserTaskTriggerJsonDeserializer;
import io.taktx.dto.UserTaskTriggerDTO;
import io.taktx.util.TaktPropertiesHelper;
import io.taktx.util.TaktUUIDDeserializer;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UserTaskTriggerTopicConsumer {
    private static final Logger log = LoggerFactory.getLogger(UserTaskTriggerTopicConsumer.class);
    private final TaktPropertiesHelper taktPropertiesHelper;
    private final Executor executor;
    private KafkaConsumer<UUID, UserTaskTriggerDTO> userTaskTriggerKafkaConsumer;
    private final Object consumerLock = new Object();
    private CompletableFuture<Void> consumerFuture;
    private volatile boolean running = false;

    UserTaskTriggerTopicConsumer(TaktPropertiesHelper taktPropertiesHelper, Executor executor) {
        this.taktPropertiesHelper = taktPropertiesHelper;
        this.executor = executor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribeToUserTaskTriggerTopics(UserTaskTriggerConsumer externalTaskTriggerConsumer) {
        log.info("Subscribing to user task trigger topic");
        List<String> topics = List.of(this.taktPropertiesHelper.getPrefixedTopicName(Topics.USER_TASK_TRIGGER_TOPIC.getTopicName()));
        this.stop();
        if (this.consumerFuture != null) {
            try {
                this.consumerFuture.join();
            }
            catch (Exception e) {
                log.warn("Error while waiting for previous consumer to complete", (Throwable)e);
            }
        }
        Object object = this.consumerLock;
        synchronized (object) {
            if (this.userTaskTriggerKafkaConsumer == null) {
                this.userTaskTriggerKafkaConsumer = this.createConsumer();
            }
            this.userTaskTriggerKafkaConsumer.subscribe(topics);
            this.running = true;
            this.consumerFuture = CompletableFuture.runAsync(() -> {
                try {
                    Object object;
                    while (this.running) {
                        object = this.consumerLock;
                        // MONITORENTER : object
                        if (!this.running || this.userTaskTriggerKafkaConsumer == null) {
                            // MONITOREXIT : object
                            break;
                        }
                        ConsumerRecords records = this.userTaskTriggerKafkaConsumer.poll(Duration.ofMillis(100L));
                        for (ConsumerRecord externalTaskTriggerRecord : records) {
                            externalTaskTriggerConsumer.accept((UserTaskTriggerDTO)externalTaskTriggerRecord.value());
                        }
                        // MONITOREXIT : object
                        try {
                            Thread.sleep(10L);
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                    object = this.consumerLock;
                }
                catch (Throwable throwable) {
                    Object object = this.consumerLock;
                    // MONITORENTER : object
                    if (this.userTaskTriggerKafkaConsumer == null) {
                        // MONITOREXIT : object
                        throw throwable;
                    }
                    try {
                        this.userTaskTriggerKafkaConsumer.unsubscribe();
                        this.userTaskTriggerKafkaConsumer.close();
                        throw throwable;
                    }
                    catch (Exception e) {
                        log.error("Error closing Kafka consumer", (Throwable)e);
                        throw throwable;
                    }
                    finally {
                        this.userTaskTriggerKafkaConsumer = null;
                    }
                }
                if (this.userTaskTriggerKafkaConsumer == null) {
                    // MONITOREXIT : object
                    return;
                }
                try {
                    this.userTaskTriggerKafkaConsumer.unsubscribe();
                    this.userTaskTriggerKafkaConsumer.close();
                    return;
                }
                catch (Exception e) {
                    log.error("Error closing Kafka consumer", (Throwable)e);
                    return;
                }
                finally {
                    this.userTaskTriggerKafkaConsumer = null;
                }
            }, this.executor);
        }
    }

    public void stop() {
        this.running = false;
    }

    private <K, V> KafkaConsumer<K, V> createConsumer() {
        String groupId = "taktx-client-user-task-trigger-consumer";
        log.info("Creating consumer for group id {}", (Object)groupId);
        Properties props = this.taktPropertiesHelper.getKafkaConsumerProperties(groupId, TaktUUIDDeserializer.class, UserTaskTriggerJsonDeserializer.class, "latest");
        return new KafkaConsumer(props);
    }
}

