/*
 * Decompiled with CFR 0.152.
 */
package org.apache.plc4x.java.examples.connectivity.kafka;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.edgent.connectors.kafka.KafkaProducer;
import org.apache.edgent.function.Function;
import org.apache.edgent.function.Supplier;
import org.apache.edgent.providers.direct.DirectProvider;
import org.apache.edgent.providers.direct.DirectTopology;
import org.apache.edgent.topology.TStream;
import org.apache.edgent.topology.Topology;
import org.apache.plc4x.edgent.PlcConnectionAdapter;
import org.apache.plc4x.edgent.PlcFunctions;
import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.examples.connectivity.kafka.model.Configuration;
import org.apache.plc4x.java.examples.connectivity.kafka.model.PlcFieldConfig;
import org.apache.plc4x.java.examples.connectivity.kafka.model.PlcMemoryBlock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConnector {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConnector.class);
    private Configuration config;
    private PlcConnectionAdapter plcAdapter;

    private KafkaConnector(String propsPath) {
        if (StringUtils.isEmpty((CharSequence)propsPath)) {
            logger.error("Empty configuration file parameter");
            throw new IllegalArgumentException("Empty configuration file parameter");
        }
        File propsFile = new File(propsPath);
        if (!propsFile.exists() || !propsFile.isFile()) {
            logger.error("Invalid configuration file {}", (Object)propsFile.getPath());
            throw new IllegalArgumentException("Invalid configuration file " + propsFile.getPath());
        }
        ObjectMapper mapper = new ObjectMapper((JsonFactory)new YAMLFactory());
        try {
            this.config = (Configuration)mapper.readValue(propsFile, Configuration.class);
            this.plcAdapter = new PlcConnectionAdapter(this.config.getPlcConfig().getConnection());
        }
        catch (IOException e) {
            logger.error("Error parsing configuration", (Throwable)e);
        }
    }

    private void run() throws PlcException {
        DirectProvider dp = new DirectProvider();
        DirectTopology top = dp.newTopology("kafka-bridge");
        PlcReadRequest.Builder builder = this.plcAdapter.readRequestBuilder();
        for (PlcMemoryBlock plcMemoryBlock : this.config.getPlcConfig().getPlcMemoryBlocks()) {
            for (PlcFieldConfig address : this.config.getPlcConfig().getPlcFields()) {
                builder = builder.addItem(plcMemoryBlock.getName() + "/" + address.getName(), "DATA_BLOCKS/" + plcMemoryBlock.getAddress() + "/" + address.getAddress());
            }
        }
        PlcReadRequest readRequest = builder.build();
        Supplier plcSupplier = PlcFunctions.batchSupplier((PlcConnectionAdapter)this.plcAdapter, (PlcReadRequest)readRequest);
        TStream source = top.poll(plcSupplier, (long)this.config.getPollingInterval(), TimeUnit.MILLISECONDS);
        TStream jsonSource = source.map((Function & Serializable)value -> {
            JsonObject jsonObject = new JsonObject();
            value.getFieldNames().forEach(fieldName -> {
                if (value.getNumberOfValues(fieldName) == 1) {
                    jsonObject.addProperty(fieldName, value.getObject(fieldName).toString());
                } else if (value.getNumberOfValues(fieldName) > 1) {
                    JsonArray values = new JsonArray();
                    value.getAllBytes(fieldName).forEach(arg_0 -> ((JsonArray)values).add(arg_0));
                    jsonObject.add(fieldName, (JsonElement)values);
                }
            });
            return jsonObject.toString();
        });
        Map<String, Object> kafkaConfig = this.createKafkaConfig();
        KafkaProducer kafka = new KafkaProducer((Topology)top, (Supplier & Serializable)() -> kafkaConfig);
        kafka.publish(jsonSource, this.config.getKafkaConfig().getTopicName());
        dp.submit((Topology)top);
    }

    private Map<String, Object> createKafkaConfig() {
        HashMap<String, Object> kafkaConfig = new HashMap<String, Object>();
        kafkaConfig.put("bootstrap.servers", this.config.getKafkaConfig().getBootstrapServers());
        if (this.config.getKafkaConfig().getProperties() != null) {
            kafkaConfig.putAll(this.config.getKafkaConfig().getProperties());
        }
        return kafkaConfig;
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.out.println("Usage: KafkaBridge {path-to-kafka-connector.yml}");
        }
        KafkaConnector kafkaBridge = new KafkaConnector(args[0]);
        kafkaBridge.run();
    }
}

