/*
 * Decompiled with CFR 0.152.
 */
package de.otto.eventsourcing.query;

import de.otto.eventsourcing.event.Key;
import de.otto.eventsourcing.event.Payload;
import de.otto.eventsourcing.monitor.TopicUpdateEvent;
import de.otto.eventsourcing.query.ConsumerRecordCallback;
import de.otto.eventsourcing.query.EventProcessor;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.kafka.annotation.KafkaListener;

public class QueryService<T> {
    private static final Logger LOG = LoggerFactory.getLogger(QueryService.class);
    private final ConcurrentMap<String, T> state = new ConcurrentHashMap<String, T>(1000);
    private final List<ConsumerRecordCallback> callbacks = new CopyOnWriteArrayList<ConsumerRecordCallback>();
    private final EventProcessor<T> eventProcessor;
    private final ApplicationEventPublisher eventPublisher;

    public QueryService(EventProcessor<T> eventProcessor, ApplicationEventPublisher eventPublisher) {
        this.eventProcessor = eventProcessor;
        this.eventPublisher = eventPublisher;
    }

    public QueryService(EventProcessor<T> eventProcessor) {
        this.eventProcessor = eventProcessor;
        this.eventPublisher = null;
    }

    public final void addCallback(ConsumerRecordCallback callback) {
        this.callbacks.add(callback);
        LOG.trace("Registered Callback #{}", (Object)this.callbacks.size());
    }

    public final void removeCallback(ConsumerRecordCallback callback) {
        this.callbacks.remove(callback);
        LOG.trace("Unregistered Callback. #{} remaining.", (Object)this.callbacks.size());
    }

    @KafkaListener(topics={"${eventsourcing.topics.default}"})
    public void receive(ConsumerRecord<Key, Payload> consumerRecord) {
        Key key = (Key)consumerRecord.key();
        if (consumerRecord.value() != null) {
            this.state.compute(this.keyForEntity(consumerRecord), (x, existing) -> this.eventProcessor.process(consumerRecord, Optional.ofNullable(existing)));
        } else {
            this.state.remove(this.keyForEntity(consumerRecord));
        }
        LOG.info("Received key='{}', payload='{}', partition='{}', offset='{}', event='{}'", new Object[]{key, consumerRecord.value(), consumerRecord.partition(), consumerRecord.offset()});
        this.notifyCallbacks(consumerRecord);
        if (this.eventPublisher != null) {
            this.eventPublisher.publishEvent((ApplicationEvent)new TopicUpdateEvent(consumerRecord));
        }
    }

    protected String keyForEntity(ConsumerRecord<Key, Payload> consumerRecord) {
        return ((Key)consumerRecord.key()).getEntityId();
    }

    private void notifyCallbacks(ConsumerRecord<Key, Payload> consumerRecord) {
        this.callbacks.forEach(c -> c.onSuccess(consumerRecord));
    }

    public final T get(String id) {
        return (T)this.state.get(id);
    }

    public final Collection<T> getAll() {
        return Collections.unmodifiableCollection(this.state.values());
    }

    public final int size() {
        return this.state.size();
    }

    public final void deleteAll() {
        this.state.clear();
    }
}

