/*
 * Decompiled with CFR 0.152.
 */
package org.apache.plc4x.kafka;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.kafka.Plc4xSinkConnector;
import org.apache.plc4x.kafka.util.VersionUtil;

public class Plc4xSinkTask
extends SinkTask {
    private String url;
    private PlcConnection plcConnection;

    public String version() {
        return VersionUtil.getVersion();
    }

    public void start(Map<String, String> props) {
        AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
        this.url = config.getString("url");
        this.openConnection();
        if (!this.plcConnection.getMetadata().canWrite()) {
            throw new ConnectException("Writing not supported on this connection");
        }
    }

    public void stop() {
        this.closeConnection();
    }

    public void put(Collection<SinkRecord> records) {
        for (SinkRecord record : records) {
            String query = record.key().toString();
            Object value = record.value();
            PlcWriteRequest.Builder builder = this.plcConnection.writeRequestBuilder();
            PlcWriteRequest plcRequest = this.addToBuilder(builder, query, value).build();
            this.doWrite(plcRequest);
        }
    }

    private PlcWriteRequest.Builder addToBuilder(PlcWriteRequest.Builder builder, String query, Object obj) {
        Class<?> type = obj.getClass();
        if (type.equals(Integer.class)) {
            int value = (Integer)obj;
            builder.addItem(query, query, new Integer[]{value});
        } else if (type.equals(String.class)) {
            String value = (String)obj;
            builder.addItem(query, query, new String[]{value});
        }
        return builder;
    }

    private void openConnection() {
        try {
            this.plcConnection = new PlcDriverManager().getConnection(this.url);
            this.plcConnection.connect();
        }
        catch (PlcConnectionException e) {
            throw new ConnectException("Could not establish a PLC connection", (Throwable)e);
        }
    }

    private void closeConnection() {
        if (this.plcConnection != null) {
            try {
                this.plcConnection.close();
            }
            catch (Exception e) {
                throw new PlcRuntimeException("Caught exception while closing connection to PLC", (Throwable)e);
            }
        }
    }

    private void doWrite(PlcWriteRequest request) {
        try {
            request.execute().get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new ConnectException("Caught exception during write", (Throwable)e);
        }
    }
}

