/*
 * Decompiled with CFR 0.152.
 */
package io.taktx.client;

import io.taktx.Topics;
import io.taktx.client.serdes.InstanceUpdateJsonDeserializer;
import io.taktx.dto.InstanceUpdateDTO;
import io.taktx.util.TaktPropertiesHelper;
import io.taktx.util.TaktUUIDDeserializer;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessInstanceUpdateConsumer {
    private static final Logger log = LoggerFactory.getLogger(ProcessInstanceUpdateConsumer.class);
    private final TaktPropertiesHelper taktPropertiesHelper;
    private final Executor executor;
    private boolean running = false;
    private final List<BiConsumer<UUID, InstanceUpdateDTO>> instanceUpdateConsumers = new ArrayList<BiConsumer<UUID, InstanceUpdateDTO>>();

    public ProcessInstanceUpdateConsumer(TaktPropertiesHelper taktPropertiesHelper, Executor executor) {
        this.taktPropertiesHelper = taktPropertiesHelper;
        this.executor = executor;
    }

    public void addInstanceUpdateConsumer(BiConsumer<UUID, InstanceUpdateDTO> consumer) {
        if (this.instanceUpdateConsumers.isEmpty()) {
            this.subscribeToTopic();
        }
        this.instanceUpdateConsumers.add(consumer);
    }

    private void subscribeToTopic() {
        this.running = true;
        CompletableFuture.runAsync(() -> {
            try (KafkaConsumer consumer = this.createConsumer();){
                String prefixedTopicName = this.taktPropertiesHelper.getPrefixedTopicName(Topics.INSTANCE_UPDATE_TOPIC.getTopicName());
                consumer.subscribe(Collections.singletonList(prefixedTopicName));
                while (this.running) {
                    this.consumeRecords(consumer);
                }
                consumer.unsubscribe();
            }
            catch (IOException e) {
                throw new IllegalStateException(e);
            }
        }, this.executor);
    }

    private void consumeRecords(KafkaConsumer<UUID, InstanceUpdateDTO> consumer) {
        consumer.poll(Duration.ofMillis(100L)).forEach(instanceUpdateRecord -> this.instanceUpdateConsumers.forEach(instanceUpdateConsumer -> instanceUpdateConsumer.accept((UUID)instanceUpdateRecord.key(), (InstanceUpdateDTO)instanceUpdateRecord.value())));
    }

    private <K, V> KafkaConsumer<K, V> createConsumer() throws IOException {
        Properties props = this.taktPropertiesHelper.getKafkaConsumerProperties("instance-update-consumer", TaktUUIDDeserializer.class, InstanceUpdateJsonDeserializer.class, "latest");
        return new KafkaConsumer(props);
    }
}

