/*
 * Decompiled with CFR 0.152.
 */
package org.apache.streampipes.extensions.connectors.pulsar.adapter;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.streampipes.commons.exceptions.SpConfigurationException;
import org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.commons.exceptions.connect.ParseException;
import org.apache.streampipes.extensions.api.connect.IAdapterConfiguration;
import org.apache.streampipes.extensions.api.connect.IEventCollector;
import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter;
import org.apache.streampipes.extensions.api.connect.context.IAdapterGuessSchemaContext;
import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext;
import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
import org.apache.streampipes.extensions.connectors.pulsar.adapter.PulsarConfig;
import org.apache.streampipes.extensions.connectors.pulsar.adapter.PulsarUtils;
import org.apache.streampipes.extensions.management.connect.adapter.BrokerEventProcessor;
import org.apache.streampipes.extensions.management.connect.adapter.parser.Parsers;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarProtocol
implements StreamPipesAdapter,
SupportsRuntimeConfig {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarProtocol.class);
    public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.pulsar";
    public static final String PULSAR_BROKER_HOST = "pulsar-broker-host";
    public static final String PULSAR_BROKER_PORT = "pulsar-broker-port";
    public static final String PULSAR_TOPIC = "pulsar-topic";
    public static final String PULSAR_SUBSCRIPTION_NAME = "pulsar-subscription-name";
    private PulsarConfig config;
    private Consumer<byte[]> consumer;

    public void applyConfiguration(IStaticPropertyExtractor extractor) {
        this.config = PulsarConfig.from(extractor);
    }

    public StaticProperty resolveConfiguration(String staticPropertyInternalName, IStaticPropertyExtractor extractor) throws SpConfigurationException {
        String brokerHost = (String)extractor.singleValueParameter(PULSAR_BROKER_HOST, String.class);
        Integer brokerPort = (Integer)extractor.singleValueParameter(PULSAR_BROKER_PORT, Integer.class);
        try {
            PulsarClient client = PulsarUtils.makePulsarClient(brokerHost + ":" + brokerPort);
            return null;
        }
        catch (PulsarClientException e) {
            throw new SpConfigurationException((Throwable)e);
        }
    }

    public IAdapterConfiguration declareConfig() {
        return ((AdapterConfigurationBuilder)((AdapterConfigurationBuilder)((AdapterConfigurationBuilder)((AdapterConfigurationBuilder)((AdapterConfigurationBuilder)((AdapterConfigurationBuilder)AdapterConfigurationBuilder.create((String)ID, PulsarProtocol::new).withSupportedParsers(Parsers.defaultParsers()).withAssets(new String[]{"documentation.md", "icon.png"})).withLocales(new Locales[]{Locales.EN})).withCategory(new AdapterType[]{AdapterType.Generic}).requiredTextParameter(Labels.withId((String)PULSAR_BROKER_HOST))).requiredIntegerParameter(Labels.withId((String)PULSAR_BROKER_PORT), Integer.valueOf(6650))).requiredTextParameter(Labels.withId((String)PULSAR_TOPIC))).requiredTextParameter(Labels.withId((String)PULSAR_SUBSCRIPTION_NAME))).buildConfiguration();
    }

    public void onAdapterStarted(IAdapterParameterExtractor extractor, IEventCollector collector, IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException {
        this.applyConfiguration(extractor.getStaticPropertyExtractor());
        BrokerEventProcessor processor = new BrokerEventProcessor(extractor.selectedParser(), collector);
        try {
            if (this.consumer != null) {
                this.consumer.close();
            }
            PulsarClient client = PulsarUtils.makePulsarClient(this.config.getBrokerUrl());
            this.consumer = client.newConsumer().topic(new String[]{this.config.getTopic()}).subscriptionName(this.config.getSubscriptionName()).messageListener((MessageListener & Serializable)(consumer, msg) -> {
                try {
                    processor.onEvent((byte[])msg.getValue());
                }
                catch (ParseException e) {
                    LOG.error("Failed to parse message.", (Throwable)e);
                }
            }).subscribe();
        }
        catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }

    public void onAdapterStopped(IAdapterParameterExtractor extractor, IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException {
        if (this.consumer != null) {
            try {
                this.consumer.close();
            }
            catch (PulsarClientException e) {
                throw new RuntimeException(e);
            }
        }
        this.consumer = null;
    }

    public GuessSchema onSchemaRequested(IAdapterParameterExtractor extractor, IAdapterGuessSchemaContext adapterGuessSchemaContext) throws AdapterException {
        ArrayList<byte[]> elements = new ArrayList<byte[]>();
        this.applyConfiguration(extractor.getStaticPropertyExtractor());
        try (PulsarClient pulsarClient = PulsarUtils.makePulsarClient(this.config.getBrokerUrl());
             Reader reader = pulsarClient.newReader().topic(this.config.getTopic()).startMessageId(MessageId.earliest).create();){
            int readCount = 0;
            while (readCount < 1) {
                Message message = reader.readNext(1, TimeUnit.SECONDS);
                if (message == null) continue;
                elements.add((byte[])message.getValue());
                ++readCount;
            }
        }
        catch (IOException e) {
            throw new ParseException("Failed to fetch messages.", (Throwable)e);
        }
        return extractor.selectedParser().getGuessSchema((InputStream)new ByteArrayInputStream((byte[])elements.get(0)));
    }
}

