/*
 * Decompiled with CFR 0.152.
 */
package net.fortytwo.smsn.p2p;

import com.illposed.osc.OSCMessage;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.Socket;
import java.util.Collection;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.fortytwo.rdfagents.data.DatasetFactory;
import net.fortytwo.rdfagents.model.Dataset;
import net.fortytwo.smsn.SemanticSynchrony;
import net.fortytwo.smsn.p2p.Connection;
import net.fortytwo.smsn.p2p.Pinger;
import net.fortytwo.smsn.p2p.ServiceBroadcastListener;
import net.fortytwo.smsn.p2p.ServiceDescription;
import net.fortytwo.smsn.p2p.sparql.ProxySparqlStreamProcessor;
import net.fortytwo.stream.sparql.RDFStreamProcessor;
import org.openrdf.model.IRI;
import org.openrdf.model.Statement;
import org.openrdf.model.ValueFactory;

public class SmSnAgent {
    private static final Logger logger = Logger.getLogger(SmSnAgent.class.getName());
    public static final String PROP_BODY = "body";
    public static final String PROP_TAG = "tag";
    private final IRI agentIri;
    private final DatasetFactory factory = new DatasetFactory();
    private ServiceBroadcastListener listener;
    private final ProxySparqlStreamProcessor streamProcessor;
    private Service coordinatorService;
    private final Connection coordinatorConnection;
    private final Pinger pinger;
    private DatagramSocket coordinatorOscSocket;
    private InetAddress coordinatorOscAddress;
    private int coordinatorOscPort;

    public SmSnAgent(boolean listenForServices) {
        this(SemanticSynchrony.getConfiguration().getServices().getAgentIri(), listenForServices);
    }

    public SmSnAgent(String agentIri, boolean listenForServices) {
        logger.log(Level.INFO, "creating SmSn agent with IRI " + agentIri);
        ValueFactory vf = this.factory.getValueFactory();
        this.agentIri = vf.createIRI(agentIri);
        this.coordinatorConnection = new Connection();
        this.pinger = new Pinger(this.coordinatorConnection);
        this.streamProcessor = new ProxySparqlStreamProcessor(this.coordinatorConnection);
        if (listenForServices) {
            this.listener = new ServiceBroadcastListener((address, description) -> {
                if (SemanticSynchrony.getConfiguration().isVerbose()) {
                    logger.log(Level.FINE, "received broadcast message from " + address.getHostAddress() + ": version=" + description.getVersion() + ", endpoint=" + description.getEndpoint() + ", pub/sub port=" + description.getPubsubPort());
                }
                if (!this.coordinatorConnection.isActive()) {
                    Socket socket;
                    this.coordinatorService = new Service();
                    this.coordinatorService.address = address;
                    this.coordinatorService.description = description;
                    try {
                        logger.log(Level.INFO, "opening socket connection to coordinator");
                        socket = new Socket(address, this.coordinatorService.description.getPubsubPort());
                    }
                    catch (IOException e) {
                        logger.log(Level.INFO, "failed to open socket connection to coordinator", e);
                        return;
                    }
                    try {
                        this.streamProcessor.notifyConnectionOpen();
                    }
                    catch (IOException e) {
                        logger.log(Level.WARNING, "error on query engine notification", e);
                        return;
                    }
                    this.coordinatorConnection.start(socket);
                } else if (SemanticSynchrony.getConfiguration().isVerbose()) {
                    logger.log(Level.FINE, "ignoring broadcast message due to existing connection to " + this.coordinatorService.address.getHostAddress());
                }
            });
            this.listener.start();
        }
    }

    public IRI getAgentIri() {
        return this.agentIri;
    }

    public Pinger getPinger() {
        return this.pinger;
    }

    public RDFStreamProcessor getStreamProcessor() {
        return this.streamProcessor;
    }

    public DatasetFactory getDatasetFactory() {
        return this.factory;
    }

    public Service getCoordinatorService() {
        return this.coordinatorService;
    }

    public Connection getCoordinatorConnection() {
        return this.coordinatorConnection;
    }

    public void stop() {
        if (null != this.listener) {
            this.listener.stop();
        }
    }

    public void sendOSCMessageToCoordinator(OSCMessage m) {
        if (this.getCoordinatorConnection().isActive()) {
            try {
                if (null == this.coordinatorOscSocket) {
                    this.coordinatorOscPort = this.getCoordinatorService().description.getOscPort();
                    this.coordinatorOscAddress = this.getCoordinatorService().address;
                    this.coordinatorOscSocket = new DatagramSocket();
                }
                byte[] buffer = m.getByteArray();
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length, this.coordinatorOscAddress, this.coordinatorOscPort);
                this.coordinatorOscSocket.send(packet);
                logger.log(Level.INFO, "sent OSC datagram to " + this.coordinatorOscAddress + ":" + this.coordinatorOscPort);
            }
            catch (IOException e) {
                logger.log(Level.SEVERE, "error in sending OSC datagram to coordinator", e);
            }
            catch (Throwable t) {
                logger.log(Level.SEVERE, "unexpected error in sending OSC datagram to coordinator", t);
            }
        }
    }

    public void sendDataset(Dataset d, int ttl) {
        this.getStreamProcessor().addInputs(ttl, this.toArray(d));
    }

    private Statement[] toArray(Dataset d) {
        Collection c = d.getStatements();
        Statement[] a = new Statement[c.size()];
        return c.toArray(a);
    }

    public class Service {
        public InetAddress address;
        public ServiceDescription description;
    }
}

