/*
 * Decompiled with CFR 0.152.
 */
package io.taktx.client;

import io.taktx.client.ExternalTaskTriggerConsumer;
import io.taktx.client.serdes.ExternalTaskTriggerJsonDeserializer;
import io.taktx.dto.ExternalTaskTriggerDTO;
import io.taktx.util.TaktPropertiesHelper;
import io.taktx.util.TaktUUIDDeserializer;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExternalTaskTriggerTopicConsumer {
    private static final Logger log = LoggerFactory.getLogger(ExternalTaskTriggerTopicConsumer.class);
    private final TaktPropertiesHelper taktPropertiesHelper;
    private final Executor executor;
    private KafkaConsumer<UUID, ExternalTaskTriggerDTO> externalTaskTriggerKafkaConsumer;
    private final Object consumerLock = new Object();
    private CompletableFuture<Void> consumerFuture;
    private volatile boolean running = false;

    ExternalTaskTriggerTopicConsumer(TaktPropertiesHelper taktPropertiesHelper, Executor executor) {
        this.taktPropertiesHelper = taktPropertiesHelper;
        this.executor = executor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribeToExternalTaskTriggerTopics(ExternalTaskTriggerConsumer externalTaskTriggerConsumer) {
        log.info("Subscribing to job ids {}", externalTaskTriggerConsumer.getJobIds());
        List<String> topics = externalTaskTriggerConsumer.getJobIds().stream().map(jobId -> this.taktPropertiesHelper.getPrefixedTopicName("external-task-trigger-" + jobId)).toList();
        this.stop();
        if (this.consumerFuture != null) {
            try {
                this.consumerFuture.join();
            }
            catch (Exception e) {
                log.warn("Error while waiting for previous consumer to complete", (Throwable)e);
            }
        }
        Object object = this.consumerLock;
        synchronized (object) {
            if (this.externalTaskTriggerKafkaConsumer == null) {
                this.externalTaskTriggerKafkaConsumer = this.createConsumer();
            }
            this.externalTaskTriggerKafkaConsumer.subscribe(topics);
            this.running = true;
            this.consumerFuture = CompletableFuture.runAsync(() -> {
                Object object;
                try {
                    do {
                        object = this.consumerLock;
                        synchronized (object) {
                            if (!this.running || this.externalTaskTriggerKafkaConsumer == null) {
                                break;
                            }
                            ConsumerRecords records = this.externalTaskTriggerKafkaConsumer.poll(Duration.ofMillis(100L));
                            if (records.isEmpty()) {
                                continue;
                            }
                            for (ConsumerRecord externalTaskTriggerRecord : records) {
                                externalTaskTriggerConsumer.accept((ExternalTaskTriggerDTO)externalTaskTriggerRecord.value());
                            }
                        }
                        try {
                            Thread.sleep(10L);
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    } while (this.running);
                }
                finally {
                    object = this.consumerLock;
                    synchronized (object) {
                        log.info("Cleaning up resources");
                        this.externalTaskTriggerKafkaConsumer.unsubscribe();
                        this.externalTaskTriggerKafkaConsumer = null;
                    }
                }
            }, this.executor);
        }
    }

    public void stop() {
        this.running = false;
    }

    private <K, V> KafkaConsumer<K, V> createConsumer() {
        String groupId = "taktx-client-external-task-trigger-consumer";
        log.info("Creating consumer for group id {}", (Object)groupId);
        Properties props = this.taktPropertiesHelper.getKafkaConsumerProperties(groupId, TaktUUIDDeserializer.class, ExternalTaskTriggerJsonDeserializer.class, "latest");
        return new KafkaConsumer(props);
    }
}

