/*
 * Decompiled with CFR 0.152.
 */
package io.taktx.client;

import io.taktx.Topics;
import io.taktx.client.serdes.ProcessDefinitionJsonDeserializer;
import io.taktx.client.serdes.ProcessDefinitionKeyJsonDeserializer;
import io.taktx.dto.ProcessDefinitionDTO;
import io.taktx.dto.ProcessDefinitionKey;
import io.taktx.util.TaktPropertiesHelper;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessDefinitionConsumer {
    private static final Logger log = LoggerFactory.getLogger(ProcessDefinitionConsumer.class);
    private final TaktPropertiesHelper taktPropertiesHelper;
    private final Executor executor;
    private final Map<ProcessDefinitionKey, String> storedHashes = new ConcurrentHashMap<ProcessDefinitionKey, String>();
    private final Map<ProcessDefinitionKey, ProcessDefinitionDTO> definitionMap = new ConcurrentHashMap<ProcessDefinitionKey, ProcessDefinitionDTO>();
    private final Map<UUID, Consumer<ConsumerRecord<ProcessDefinitionKey, ProcessDefinitionDTO>>> processDefinitionUpdateConsumers = new ConcurrentHashMap<UUID, Consumer<ConsumerRecord<ProcessDefinitionKey, ProcessDefinitionDTO>>>();
    private KafkaConsumer<ProcessDefinitionKey, ProcessDefinitionDTO> definitionActivationConsumer;
    private volatile boolean running = false;

    ProcessDefinitionConsumer(TaktPropertiesHelper taktPropertiesHelper, Executor executor) {
        this.taktPropertiesHelper = taktPropertiesHelper;
        this.executor = executor;
    }

    public void subscribeToDefinitionRecords() {
        this.definitionActivationConsumer = this.createConsumer();
        String prefixedTopicName = this.taktPropertiesHelper.getPrefixedTopicName(Topics.PROCESS_DEFINITION_ACTIVATION_TOPIC.getTopicName());
        log.info("Subscribing to topic {}", (Object)prefixedTopicName);
        this.subscribe(prefixedTopicName);
        this.running = true;
        CompletableFuture.runAsync(() -> {
            while (this.running) {
                ConsumerRecords records = this.definitionActivationConsumer.poll(Duration.ofMillis(100L));
                for (ConsumerRecord activationRecord : records) {
                    log.info("Received definition activation record {} to state {}", activationRecord.key(), (Object)((ProcessDefinitionDTO)activationRecord.value()).getState());
                    String storedHash = this.storedHashes.get(activationRecord.key());
                    if (storedHash != null && !storedHash.equals(((ProcessDefinitionDTO)activationRecord.value()).getDefinitions().getDefinitionsKey().getHash())) {
                        log.warn("Hash mismatch for process definition {} {}", activationRecord.key(), activationRecord.value());
                    }
                    this.storedHashes.put((ProcessDefinitionKey)activationRecord.key(), ((ProcessDefinitionDTO)activationRecord.value()).getDefinitions().getDefinitionsKey().getHash());
                    this.definitionMap.put((ProcessDefinitionKey)activationRecord.key(), (ProcessDefinitionDTO)activationRecord.value());
                    log.info("Notifying {} consumers of process definition update", (Object)this.processDefinitionUpdateConsumers.size());
                    this.processDefinitionUpdateConsumers.forEach((key, consumer) -> consumer.accept(activationRecord));
                }
            }
            this.definitionActivationConsumer.unsubscribe();
            this.definitionActivationConsumer.close();
        }, this.executor);
    }

    public void stop() {
        this.running = false;
    }

    public Map<ProcessDefinitionKey, ProcessDefinitionDTO> getDeployedProcessDefinitions() {
        return this.definitionMap;
    }

    public Map<ProcessDefinitionKey, ProcessDefinitionDTO> getDeployedProcessDefinitions(String processDefinitionId) {
        return this.definitionMap.entrySet().stream().filter(e -> ((ProcessDefinitionKey)e.getKey()).getProcessDefinitionId().equals(processDefinitionId)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    public Optional<ProcessDefinitionDTO> getDeployedProcessDefinitionbyHash(String processDefinitionId, String hash) {
        return this.definitionMap.entrySet().stream().filter(e -> ((ProcessDefinitionKey)e.getKey()).getProcessDefinitionId().equals(processDefinitionId) && ((ProcessDefinitionDTO)e.getValue()).getDefinitions().getDefinitionsKey().getHash().equals(hash)).map(Map.Entry::getValue).findFirst();
    }

    public UUID subscribeToProcessDefinitionUpdates(Consumer<ConsumerRecord<ProcessDefinitionKey, ProcessDefinitionDTO>> consumer) {
        UUID consumerKey = UUID.randomUUID();
        this.processDefinitionUpdateConsumers.put(consumerKey, consumer);
        return consumerKey;
    }

    private void subscribe(String prefixedTopicName) {
        this.definitionActivationConsumer.subscribe(Collections.singletonList(prefixedTopicName));
    }

    private <K, V> KafkaConsumer<K, V> createConsumer() {
        String groupId = "client-definition-activation-consumer-" + String.valueOf(UUID.randomUUID());
        log.info("Creating consumer for group id {}", (Object)groupId);
        Properties props = this.taktPropertiesHelper.getKafkaConsumerProperties(groupId, ProcessDefinitionKeyJsonDeserializer.class, ProcessDefinitionJsonDeserializer.class, "earliest");
        return new KafkaConsumer(props);
    }
}

