package org.apache.streampipes.sinks.brokers.jvm.bufferrest;

import java.io.IOException;
import java.util.List;
import org.apache.commons.io.Charsets;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.StringEntity;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.SpDataFormatDefinition;
import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sinks.brokers.jvm.bufferrest.buffer.BufferListener;
import org.apache.streampipes.sinks.brokers.jvm.bufferrest.buffer.MessageBuffer;
import org.apache.streampipes.wrapper.params.compat.SinkParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streampipes/sinks/brokers/jvm/bufferrest/BufferRestPublisherSink.class */
public class BufferRestPublisherSink extends StreamPipesDataSink implements BufferListener {
    private static final Logger LOG = LoggerFactory.getLogger(BufferRestPublisherSink.class);
    private static final String KEY = "bufferrest";
    private static final String URI = ".uri";
    private static final String COUNT = ".count";
    private static final String FIELDS = ".fields-to-send";
    private List<String> fieldsToSend;
    private SpDataFormatDefinition dataFormatDefinition;
    private String restEndpointURI;
    private MessageBuffer buffer;

    public DataSinkDescription declareModel() {
        return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.bufferrest", 0).category(new DataSinkType[]{DataSinkType.NOTIFICATION}).withLocales(new Locales[]{Locales.EN}).requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithNaryMapping(EpRequirements.anyProperty(), Labels.withId("bufferrest.fields-to-send"), PropertyScope.NONE).build()).requiredTextParameter(Labels.from("bufferrest.uri", "REST Endpoint URI", "REST Endpoint URI")).requiredIntegerParameter(Labels.from("bufferrest.count", "Buffered Event Count", "Number (1 <= x <= 1000000) of incoming events before sending data on to the given REST endpoint"), 1, 1000000, 1).build();
    }

    public void onInvocation(SinkParams sinkParams, EventSinkRuntimeContext eventSinkRuntimeContext) throws SpRuntimeException {
        IDataSinkParameterExtractor extractor = sinkParams.extractor();
        this.fieldsToSend = extractor.mappingPropertyValues("bufferrest.fields-to-send");
        this.restEndpointURI = (String) extractor.singleValueParameter("bufferrest.uri", String.class);
        int parseInt = Integer.parseInt((String) extractor.singleValueParameter("bufferrest.count", String.class));
        this.dataFormatDefinition = new JsonDataFormatDefinition();
        this.buffer = new MessageBuffer(parseInt);
        this.buffer.addListener(this);
    }

    public void onEvent(Event event) throws SpRuntimeException {
        try {
            this.buffer.addMessage(new String(this.dataFormatDefinition.fromMap(event.getSubset(this.fieldsToSend).getRaw())));
        } catch (SpRuntimeException e) {
            LOG.error("Could not parse incoming event");
        }
    }

    public void onDetach() throws SpRuntimeException {
        this.buffer.removeListener(this);
    }

    @Override // org.apache.streampipes.sinks.brokers.jvm.bufferrest.buffer.BufferListener
    public void bufferFull(String str) {
        try {
            Request.Post(this.restEndpointURI).body(new StringEntity(str, Charsets.UTF_8)).execute();
        } catch (IOException e) {
            LOG.error("Could not reach endpoint at {}", this.restEndpointURI);
        }
    }
}
