package org.apache.plc4x.kafka;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.connection.PlcConnection;
import org.apache.plc4x.java.api.connection.PlcReader;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.kafka.util.VersionUtil;

/* loaded from: input_file:org/apache/plc4x/kafka/Plc4xSourceTask.class */
public class Plc4xSourceTask extends SourceTask {
    private static final long WAIT_LIMIT_MILLIS = 100;
    private static final long TIMEOUT_LIMIT_MILLIS = 5000;
    private static final String URL_FIELD = "url";
    private static final String QUERY_FIELD = "query";
    private static final Schema KEY_SCHEMA = new SchemaBuilder(Schema.Type.STRUCT).field(URL_FIELD, Schema.STRING_SCHEMA).field(QUERY_FIELD, Schema.STRING_SCHEMA).build();
    private String topic;
    private String url;
    private List<String> queries;
    private PlcConnection plcConnection;
    private PlcReader plcReader;
    private PlcReadRequest plcRequest;
    private ScheduledExecutorService scheduler;
    private boolean fetch = true;

    public String version() {
        return VersionUtil.getVersion();
    }

    public void start(Map<String, String> map) {
        AbstractConfig abstractConfig = new AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, map);
        this.topic = abstractConfig.getString("topic");
        this.url = abstractConfig.getString(URL_FIELD);
        this.queries = abstractConfig.getList("queries");
        openConnection();
        this.plcReader = (PlcReader) this.plcConnection.getReader().orElseThrow(() -> {
            return new ConnectException("PlcReader not available for this type of connection");
        });
        PlcReadRequest.Builder readRequestBuilder = this.plcReader.readRequestBuilder();
        for (String str : this.queries) {
            readRequestBuilder.addItem(str, str);
        }
        this.plcRequest = (PlcReadRequest) readRequestBuilder.build();
        int intValue = Integer.valueOf(map.get("rate")).intValue();
        this.scheduler = Executors.newScheduledThreadPool(1);
        this.scheduler.scheduleAtFixedRate(this::scheduleFetch, intValue, intValue, TimeUnit.MILLISECONDS);
    }

    public void stop() {
        this.scheduler.shutdown();
        closeConnection();
        notify();
    }

    public List<SourceRecord> poll() throws InterruptedException {
        if (awaitFetch(WAIT_LIMIT_MILLIS)) {
            return doFetch();
        }
        return null;
    }

    private void openConnection() {
        try {
            this.plcConnection = new PlcDriverManager().getConnection(this.url);
            this.plcConnection.connect();
        } catch (PlcConnectionException e) {
            throw new ConnectException("Could not establish a PLC connection", e);
        }
    }

    private void closeConnection() {
        if (this.plcConnection != null) {
            try {
                this.plcConnection.close();
            } catch (Exception e) {
                throw new RuntimeException("Caught exception while closing connection to PLC", e);
            }
        }
    }

    private synchronized void scheduleFetch() {
        this.fetch = true;
        notify();
    }

    private synchronized boolean awaitFetch(long j) throws InterruptedException {
        if (!this.fetch) {
            wait(j);
        }
        try {
            boolean z = this.fetch;
            this.fetch = false;
            return z;
        } catch (Throwable th) {
            this.fetch = false;
            throw th;
        }
    }

    private List<SourceRecord> doFetch() throws InterruptedException {
        try {
            return extractValues((PlcReadResponse) this.plcReader.read(this.plcRequest).get(TIMEOUT_LIMIT_MILLIS, TimeUnit.MILLISECONDS));
        } catch (ExecutionException e) {
            throw new ConnectException("Could not fetch data from source", e);
        } catch (TimeoutException e2) {
            throw new ConnectException("Timed out waiting for data from source", e2);
        }
    }

    private List<SourceRecord> extractValues(PlcReadResponse<?> plcReadResponse) {
        LinkedList linkedList = new LinkedList();
        for (String str : this.queries) {
            if (plcReadResponse.getResponseCode(str).equals(PlcResponseCode.OK)) {
                Struct put = new Struct(KEY_SCHEMA).put(URL_FIELD, this.url).put(QUERY_FIELD, str);
                Object object = plcReadResponse.getObject(str);
                Schema schema = getSchema(object);
                Long valueOf = Long.valueOf(System.currentTimeMillis());
                HashMap hashMap = new HashMap();
                hashMap.put(URL_FIELD, this.url);
                hashMap.put(QUERY_FIELD, str);
                linkedList.add(new SourceRecord(hashMap, Collections.singletonMap("offset", valueOf), this.topic, KEY_SCHEMA, put, schema, object));
            }
        }
        return linkedList;
    }

    private Schema getSchema(Object obj) {
        Objects.requireNonNull(obj);
        if (obj instanceof Byte) {
            return Schema.INT8_SCHEMA;
        }
        if (obj instanceof Short) {
            return Schema.INT16_SCHEMA;
        }
        if (obj instanceof Integer) {
            return Schema.INT32_SCHEMA;
        }
        if (obj instanceof Long) {
            return Schema.INT64_SCHEMA;
        }
        if (obj instanceof Float) {
            return Schema.FLOAT32_SCHEMA;
        }
        if (obj instanceof Double) {
            return Schema.FLOAT64_SCHEMA;
        }
        if (obj instanceof Boolean) {
            return Schema.BOOLEAN_SCHEMA;
        }
        if (obj instanceof String) {
            return Schema.STRING_SCHEMA;
        }
        if (obj instanceof byte[]) {
            return Schema.BYTES_SCHEMA;
        }
        throw new ConnectException(String.format("Unsupported data type %s", obj.getClass().getName()));
    }
}
