/*
 * Decompiled with CFR 0.152.
 */
package org.apache.plc4x.kafka;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.kafka.util.VersionUtil;

public class Plc4xSourceTask
extends SourceTask {
    static final String TOPIC_CONFIG = "topic";
    private static final String TOPIC_DOC = "Kafka topic to publish to";
    static final String URL_CONFIG = "url";
    private static final String URL_DOC = "PLC URL";
    static final String QUERIES_CONFIG = "queries";
    private static final String QUERIES_DOC = "Field queries to be sent to the PLC";
    static final String RATE_CONFIG = "rate";
    private static final Integer RATE_DEFAULT = 1000;
    private static final String RATE_DOC = "Polling rate";
    private static final ConfigDef CONFIG_DEF = new ConfigDef().define("topic", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Kafka topic to publish to").define("url", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "PLC URL").define("queries", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "Field queries to be sent to the PLC").define("rate", ConfigDef.Type.INT, (Object)RATE_DEFAULT, ConfigDef.Importance.MEDIUM, "Polling rate");
    private static final long WAIT_LIMIT_MILLIS = 100L;
    private static final long TIMEOUT_LIMIT_MILLIS = 5000L;
    private static final String URL_FIELD = "url";
    private static final String QUERY_FIELD = "query";
    private static final Schema KEY_SCHEMA = new SchemaBuilder(Schema.Type.STRUCT).field("url", Schema.STRING_SCHEMA).field("query", Schema.STRING_SCHEMA).build();
    private String topic;
    private String url;
    private List<String> queries;
    private PlcConnection plcConnection;
    private ScheduledExecutorService scheduler;
    private boolean fetch = true;

    public String version() {
        return VersionUtil.getVersion();
    }

    public void start(Map<String, String> props) {
        AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
        this.topic = config.getString(TOPIC_CONFIG);
        this.url = config.getString("url");
        this.queries = config.getList(QUERIES_CONFIG);
        this.openConnection();
        if (!this.plcConnection.getMetadata().canRead()) {
            throw new ConnectException("Reading not supported on this connection");
        }
        int rate = Integer.parseInt(props.get(RATE_CONFIG));
        this.scheduler = Executors.newScheduledThreadPool(1);
        this.scheduler.scheduleAtFixedRate(this::scheduleFetch, rate, rate, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        this.scheduler.shutdown();
        this.closeConnection();
        Plc4xSourceTask plc4xSourceTask = this;
        synchronized (plc4xSourceTask) {
            ((Object)((Object)this)).notify();
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        return this.awaitFetch(100L) ? this.doFetch() : null;
    }

    private void openConnection() {
        try {
            this.plcConnection = new PlcDriverManager().getConnection(this.url);
            this.plcConnection.connect();
        }
        catch (PlcConnectionException e) {
            throw new ConnectException("Could not establish a PLC connection", (Throwable)e);
        }
    }

    private void closeConnection() {
        if (this.plcConnection != null) {
            try {
                this.plcConnection.close();
            }
            catch (Exception e) {
                throw new PlcRuntimeException("Caught exception while closing connection to PLC", (Throwable)e);
            }
        }
    }

    private synchronized void scheduleFetch() {
        this.fetch = true;
        ((Object)((Object)this)).notify();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized boolean awaitFetch(long milliseconds) throws InterruptedException {
        if (!this.fetch) {
            ((Object)((Object)this)).wait(milliseconds);
        }
        try {
            boolean bl = this.fetch;
            return bl;
        }
        finally {
            this.fetch = false;
        }
    }

    private List<SourceRecord> doFetch() throws InterruptedException {
        CompletableFuture response = this.createReadRequest().execute();
        try {
            PlcReadResponse received = (PlcReadResponse)response.get(5000L, TimeUnit.MILLISECONDS);
            return this.extractValues(received);
        }
        catch (ExecutionException e) {
            throw new ConnectException("Could not fetch data from source", (Throwable)e);
        }
        catch (TimeoutException e) {
            throw new ConnectException("Timed out waiting for data from source", (Throwable)e);
        }
    }

    private PlcReadRequest createReadRequest() {
        PlcReadRequest.Builder builder = this.plcConnection.readRequestBuilder();
        for (String query : this.queries) {
            builder.addItem(query, query);
        }
        return builder.build();
    }

    private List<SourceRecord> extractValues(PlcReadResponse response) {
        LinkedList<SourceRecord> result = new LinkedList<SourceRecord>();
        for (String query : this.queries) {
            PlcResponseCode rc = response.getResponseCode(query);
            if (!rc.equals((Object)PlcResponseCode.OK)) continue;
            Struct key = new Struct(KEY_SCHEMA).put("url", (Object)this.url).put(QUERY_FIELD, (Object)query);
            Object value = response.getObject(query);
            Schema valueSchema = this.getSchema(value);
            Long timestamp = System.currentTimeMillis();
            HashMap<String, String> sourcePartition = new HashMap<String, String>();
            sourcePartition.put("url", this.url);
            sourcePartition.put(QUERY_FIELD, query);
            Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
            SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, this.topic, KEY_SCHEMA, (Object)key, valueSchema, value);
            result.add(record);
        }
        return result;
    }

    private Schema getSchema(Object value) {
        Objects.requireNonNull(value);
        if (value instanceof Byte) {
            return Schema.INT8_SCHEMA;
        }
        if (value instanceof Short) {
            return Schema.INT16_SCHEMA;
        }
        if (value instanceof Integer) {
            return Schema.INT32_SCHEMA;
        }
        if (value instanceof Long) {
            return Schema.INT64_SCHEMA;
        }
        if (value instanceof Float) {
            return Schema.FLOAT32_SCHEMA;
        }
        if (value instanceof Double) {
            return Schema.FLOAT64_SCHEMA;
        }
        if (value instanceof Boolean) {
            return Schema.BOOLEAN_SCHEMA;
        }
        if (value instanceof String) {
            return Schema.STRING_SCHEMA;
        }
        if (value instanceof byte[]) {
            return Schema.BYTES_SCHEMA;
        }
        throw new ConnectException(String.format("Unsupported data type %s", value.getClass().getName()));
    }
}

