package org.apache.streampipes.sinks.brokers.jvm.websocket;

import java.io.IOException;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.wrapper.params.compat.SinkParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;

/* loaded from: input_file:org/apache/streampipes/sinks/brokers/jvm/websocket/WebsocketServerSink.class */
public class WebsocketServerSink extends StreamPipesDataSink {
    private static final String PORT_KEY = "port";
    private Integer port;
    private SocketServer server;

    public DataSinkDescription declareModel() {
        return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.websocket").category(new DataSinkType[]{DataSinkType.MESSAGING}).withLocales(new Locales[]{Locales.EN}).withAssets(new String[]{"documentation.md", "icon.png"}).requiredStream(StreamRequirementsBuilder.create().requiredProperty(EpRequirements.anyProperty()).build()).requiredIntegerParameter(Labels.withId(PORT_KEY)).build();
    }

    public void onInvocation(SinkParams sinkParams, EventSinkRuntimeContext eventSinkRuntimeContext) throws SpRuntimeException {
        this.port = (Integer) sinkParams.extractor().singleValueParameter(PORT_KEY, Integer.class);
        this.server = new SocketServer(this.port.intValue());
        this.server.setReuseAddr(true);
        this.server.start();
    }

    public void onEvent(Event event) throws SpRuntimeException {
        this.server.onEvent(event);
    }

    public void onDetach() throws SpRuntimeException {
        try {
            this.server.stop();
            this.server = null;
        } catch (IOException | InterruptedException e) {
            throw new SpRuntimeException(e.getMessage());
        }
    }
}
