/*
 * Decompiled with CFR 0.152.
 */
package net.lightapi.portal.user.query;

import com.networknt.config.Config;
import com.networknt.config.JsonMapper;
import com.networknt.kafka.common.KafkaStreamsConfig;
import com.networknt.kafka.streams.LightStreams;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import net.lightapi.portal.PortalConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NotificationStreams
implements LightStreams {
    private static final Logger logger = LoggerFactory.getLogger(NotificationStreams.class);
    static final KafkaStreamsConfig streamsConfig = (KafkaStreamsConfig)Config.getInstance().getJsonObjectConfig("kafka-streams", KafkaStreamsConfig.class);
    static final PortalConfig portalConfig = (PortalConfig)Config.getInstance().getJsonObjectConfig("portal", PortalConfig.class);
    private static final String notification = "user-notification-store";
    KafkaStreams notificationStreams;

    public NotificationStreams() {
        logger.info("NotificationStreams is created");
    }

    public ReadOnlyKeyValueStore<String, String> getNotificationStore() {
        QueryableStoreType queryableStoreType = QueryableStoreTypes.keyValueStore();
        StoreQueryParameters sqp = StoreQueryParameters.fromNameAndType((String)notification, (QueryableStoreType)queryableStoreType);
        return (ReadOnlyKeyValueStore)this.notificationStreams.store(sqp);
    }

    public KeyQueryMetadata getNotificationStreamsMetadata(String email) {
        return this.notificationStreams.queryMetadataForKey(notification, (Object)email, Serdes.String().serializer());
    }

    private void startNotificationStreams(String ip, int port) {
        StreamsBuilder builder = new StreamsBuilder();
        StoreBuilder keyValueNotificationStoreBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)notification), (Serde)Serdes.String(), (Serde)Serdes.String());
        builder.addStateStore(keyValueNotificationStoreBuilder);
        builder.stream(portalConfig.getNotificationTopic()).process(new ProcessorSupplier(this){

            public Processor get() {
                return new NotificationEventProcessor();
            }
        }, new String[]{notification});
        Topology topology = builder.build();
        Properties streamsProps = new Properties();
        streamsProps.putAll((Map<?, ?>)streamsConfig.getProperties());
        streamsProps.put("default.key.serde", Serdes.ByteArray().getClass());
        streamsProps.put("default.value.serde", Serdes.ByteArray().getClass());
        streamsProps.put("application.id", portalConfig.getNotificationApplicationId());
        streamsProps.put("application.server", ip + ":" + port);
        this.notificationStreams = new KafkaStreams(topology, streamsProps);
        this.notificationStreams.setUncaughtExceptionHandler(ex -> {
            logger.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
        });
        if (streamsConfig.isCleanUp()) {
            this.notificationStreams.cleanUp();
        }
        this.notificationStreams.start();
    }

    public void start(String ip, int port) {
        if (logger.isDebugEnabled()) {
            logger.debug("NotificationStreams is starting...");
        }
        this.startNotificationStreams(ip, port);
    }

    public void close() {
        if (logger.isDebugEnabled()) {
            logger.debug("NotificationStreams is closing...");
        }
        this.notificationStreams.close();
    }

    public static class NotificationEventProcessor
    extends AbstractProcessor<byte[], byte[]> {
        private ProcessorContext pc;
        private KeyValueStore<String, String> notificationStore;

        public void init(ProcessorContext pc) {
            this.pc = pc;
            this.notificationStore = (KeyValueStore)pc.getStateStore(NotificationStreams.notification);
            if (logger.isInfoEnabled()) {
                logger.info("Processor initialized");
            }
        }

        public void process(byte[] key, byte[] value) {
            String email = new String(key, StandardCharsets.UTF_8);
            String notification = new String(value, StandardCharsets.UTF_8);
            Map notificationMap = JsonMapper.string2Map((String)notification);
            LinkedList<Map> notificationList = new LinkedList<Map>();
            String notifications = (String)this.notificationStore.get((Object)email);
            if (notifications != null) {
                notificationList.addAll(JsonMapper.string2List((String)notifications));
            }
            notificationList.addFirst(notificationMap);
            if (notificationList.size() > 50) {
                notificationList.removeLast();
            }
            this.notificationStore.put((Object)email, (Object)JsonMapper.toJson(notificationList));
        }

        public void close() {
            if (logger.isInfoEnabled()) {
                logger.info("Closing processor...");
            }
        }
    }
}

