package org.apache.streampipes.wrapper.standalone;

import com.google.gson.JsonObject;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.ContentType;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.TransportProtocol;
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
import org.apache.streampipes.wrapper.runtime.ExternalEventProcessor;
import org.apache.streampipes.wrapper.standalone.declarer.StandaloneExternalEventProcessingDeclarer;

/* loaded from: input_file:org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.class */
public abstract class StreamPipesExternalDataProcessor extends StandaloneExternalEventProcessingDeclarer<ProcessorParams> implements ExternalEventProcessor<ProcessorParams> {
    private static final String HTTP_PROTOCOL = "http://";
    private static final String PYTHON_ENDPOINT = "localhost:5000";
    private String invocationId;
    private String appId;
    private String inputTopic;
    private String outputTopic;
    private String kafkaUrl;

    @Override // org.apache.streampipes.wrapper.standalone.declarer.StandaloneExternalEventProcessingDeclarer
    public ConfiguredExternalEventProcessor<ProcessorParams> onInvocation(DataProcessorInvocation dataProcessorInvocation, ProcessingElementParameterExtractor processingElementParameterExtractor) {
        ProcessorParams processorParams = new ProcessorParams(dataProcessorInvocation);
        this.invocationId = UUID.randomUUID().toString();
        this.appId = dataProcessorInvocation.getAppId();
        this.inputTopic = getInputTopic(processorParams);
        this.outputTopic = getOutputTopic(processorParams);
        this.kafkaUrl = getKafkaUrl(processorParams);
        return new ConfiguredExternalEventProcessor<>(new ProcessorParams(dataProcessorInvocation), () -> {
            return this;
        });
    }

    protected JsonObject createMinimalInvocationGraph(Map<String, String> map) {
        JsonObject jsonObject = new JsonObject();
        jsonObject.addProperty("invocation_id", this.invocationId);
        jsonObject.addProperty("processor_id", this.appId);
        jsonObject.addProperty("input_topics", this.inputTopic);
        jsonObject.addProperty("output_topics", this.outputTopic);
        jsonObject.addProperty("bootstrap_servers", this.kafkaUrl);
        JsonObject jsonObject2 = new JsonObject();
        Objects.requireNonNull(jsonObject2);
        map.forEach(jsonObject2::addProperty);
        jsonObject.add("static_properties", jsonObject2);
        return jsonObject;
    }

    protected void invoke(JsonObject jsonObject) {
        post("invoke", jsonObject.toString());
    }

    protected void detach() {
        JsonObject jsonObject = new JsonObject();
        jsonObject.addProperty("invocation_id", this.invocationId);
        post("detach", jsonObject.toString());
    }

    private static String post(String str, String str2) {
        String str3 = null;
        try {
            str3 = Request.Post("http://localhost:5000/" + str).bodyString(str2, ContentType.APPLICATION_JSON).connectTimeout(1000).socketTimeout(100000).execute().returnContent().asString();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return str3;
    }

    private String getInputTopic(EventProcessorBindingParams eventProcessorBindingParams) {
        return ((SpDataStream) eventProcessorBindingParams.getGraph().getInputStreams().get(0)).getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
    }

    private String getOutputTopic(EventProcessorBindingParams eventProcessorBindingParams) {
        return eventProcessorBindingParams.getGraph().getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
    }

    private String getKafkaUrl(EventProcessorBindingParams eventProcessorBindingParams) {
        return ((TransportProtocol) eventProcessorBindingParams.getGraph().getOutputStream().getEventGrounding().getTransportProtocols().get(0)).getBrokerHostname() + ":" + Integer.valueOf(((KafkaTransportProtocol) eventProcessorBindingParams.getGraph().getOutputStream().getEventGrounding().getTransportProtocols().get(0)).getKafkaPort()).toString();
    }
}
