package org.apache.streampipes.processors.changedetection.jvm.welford;

import java.util.Arrays;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;

/* loaded from: input_file:org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.class */
public class WelfordChangeDetection extends StreamPipesDataProcessor {
    private static final String NUMBER_MAPPING = "number-mapping";
    private static final String PARAM_K = "param-k";
    private static final String PARAM_H = "param-h";
    private String selectedNumberMapping;
    private Double k;
    private Double h;
    private Double cuSumLow;
    private Double cuSumHigh;
    private WelfordAggregate welfordAggregate;

    public DataProcessorDescription declareModel() {
        return ProcessingElementBuilder.create("org.apache.streampipes.processors.changedetection.jvm.welford").category(new DataProcessorType[]{DataProcessorType.VALUE_OBSERVER}).withAssets(new String[]{"documentation.md", "icon.png"}).withLocales(new Locales[]{Locales.EN}).requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(EpRequirements.numberReq(), Labels.withId(NUMBER_MAPPING), PropertyScope.NONE).build()).requiredFloatParameter(Labels.withId(PARAM_K), Float.valueOf(0.0f), Float.valueOf(0.0f), Float.valueOf(100.0f), Float.valueOf(0.01f)).requiredFloatParameter(Labels.withId(PARAM_H), Float.valueOf(0.0f), Float.valueOf(0.0f), Float.valueOf(100.0f), Float.valueOf(0.01f)).outputStrategy(OutputStrategies.append(Arrays.asList(EpProperties.numberEp(Labels.empty(), WelfordEventFields.VAL_LOW.label, "http://schema.org/Number"), EpProperties.numberEp(Labels.empty(), WelfordEventFields.VAL_HIGH.label, "http://schema.org/Number"), EpProperties.booleanEp(Labels.empty(), WelfordEventFields.DECISION_LOW.label, "http://schema.org/Boolean"), EpProperties.booleanEp(Labels.empty(), WelfordEventFields.DECISION_HIGH.label, "http://schema.org/Boolean")))).build();
    }

    public void onInvocation(ProcessorParams processorParams, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext eventProcessorRuntimeContext) throws SpRuntimeException {
        ProcessingElementParameterExtractor extractor = processorParams.extractor();
        this.selectedNumberMapping = extractor.mappingPropertyValue(NUMBER_MAPPING);
        this.k = (Double) extractor.singleValueParameter(PARAM_K, Double.class);
        this.h = (Double) extractor.singleValueParameter(PARAM_H, Double.class);
        this.cuSumLow = Double.valueOf(0.0d);
        this.cuSumHigh = Double.valueOf(0.0d);
        this.welfordAggregate = new WelfordAggregate();
    }

    public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException {
        Double asDouble = event.getFieldBySelector(this.selectedNumberMapping).getAsPrimitive().getAsDouble();
        this.welfordAggregate.update(asDouble);
        updateStatistics(getZScoreNormalizedValue(asDouble));
        Boolean testResult = getTestResult(this.cuSumHigh, this.h);
        Boolean testResult2 = getTestResult(this.cuSumLow, this.h);
        spOutputCollector.collect(updateEvent(event, this.cuSumLow, this.cuSumHigh, testResult2, testResult));
        if (testResult.booleanValue() || testResult2.booleanValue()) {
            resetAfterChange();
        }
    }

    public void onDetach() throws SpRuntimeException {
        this.cuSumLow = Double.valueOf(0.0d);
        this.cuSumHigh = Double.valueOf(0.0d);
    }

    private Double getZScoreNormalizedValue(Double d) {
        return Double.valueOf((d.doubleValue() - this.welfordAggregate.getMean().doubleValue()) / this.welfordAggregate.getSampleStd().doubleValue());
    }

    private void updateStatistics(Double d) {
        if (d.isNaN()) {
            return;
        }
        this.cuSumHigh = Double.valueOf(Math.max(0.0d, (this.cuSumHigh.doubleValue() + d.doubleValue()) - this.k.doubleValue()));
        this.cuSumLow = Double.valueOf(Math.min(0.0d, this.cuSumLow.doubleValue() + d.doubleValue() + this.k.doubleValue()));
    }

    private Boolean getTestResult(Double d, Double d2) {
        return Boolean.valueOf(Math.abs(d.doubleValue()) > this.h.doubleValue());
    }

    private Event updateEvent(Event event, Double d, Double d2, Boolean bool, Boolean bool2) {
        event.addField(WelfordEventFields.VAL_LOW.label, d);
        event.addField(WelfordEventFields.VAL_HIGH.label, d2);
        event.addField(WelfordEventFields.DECISION_LOW.label, bool);
        event.addField(WelfordEventFields.DECISION_HIGH.label, bool2);
        return event;
    }

    private void resetAfterChange() {
        this.cuSumHigh = Double.valueOf(0.0d);
        this.cuSumLow = Double.valueOf(0.0d);
        this.welfordAggregate = new WelfordAggregate();
    }
}
