package org.apache.streampipes.wrapper.standalone.function;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.declarer.IFunctionConfig;
import org.apache.streampipes.extensions.api.declarer.IStreamPipesFunctionDeclarer;
import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager;
import org.apache.streampipes.extensions.api.pe.routing.RawDataProcessor;
import org.apache.streampipes.extensions.api.pe.routing.SpInputCollector;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.extensions.management.util.GroundingDebugUtils;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.function.FunctionId;
import org.apache.streampipes.model.grounding.TransportFormat;
import org.apache.streampipes.model.monitoring.SpLogEntry;
import org.apache.streampipes.model.monitoring.SpLogMessage;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.EventFactory;
import org.apache.streampipes.model.runtime.SchemaInfo;
import org.apache.streampipes.model.runtime.SourceInfo;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.class */
public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclarer, RawDataProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(StreamPipesFunction.class);
    private Map<String, SpInputCollector> inputCollectors;
    private final Map<String, SourceInfo> sourceInfoMapper = new HashMap();
    private final Map<String, SchemaInfo> schemaInfoMapper = new HashMap();
    private Map<String, SpOutputCollector> outputCollectors = new HashMap();

    public void invokeRuntime(String str) {
        FunctionId functionId = getFunctionConfig().getFunctionId();
        initializeProducers();
        FunctionContext generate = new FunctionContextGenerator(functionId.getId(), str, requiredStreamIds(), this.outputCollectors).generate();
        AtomicInteger atomicInteger = new AtomicInteger();
        generate.getStreams().forEach(spDataStream -> {
            String topic = getTopic(spDataStream);
            this.sourceInfoMapper.put(topic, createSourceInfo(spDataStream, atomicInteger.get()));
            this.schemaInfoMapper.put(topic, createSchemaInfo(spDataStream.getEventSchema()));
            atomicInteger.getAndIncrement();
        });
        this.inputCollectors = getInputCollectors(generate.getStreams());
        LOG.info("Invoking function {}:{}", functionId.getId(), Integer.valueOf(functionId.getVersion()));
        onServiceStarted(generate);
        registerConsumers();
    }

    public void discardRuntime() {
        FunctionId functionId = getFunctionConfig().getFunctionId();
        LOG.info("Discarding function {}:{}", functionId.getId(), Integer.valueOf(functionId.getVersion()));
        onServiceStopped();
        unregisterConsumers();
        this.outputCollectors.forEach((str, spOutputCollector) -> {
            spOutputCollector.disconnect();
        });
    }

    public void process(Map<String, Object> map, String str) {
        try {
            SourceInfo sourceInfo = this.sourceInfoMapper.get(str);
            onEvent(EventFactory.fromMap(map, sourceInfo, this.schemaInfoMapper.get(str)), sourceInfo.getSourceId());
            increaseCounter(sourceInfo.getSourceId());
        } catch (RuntimeException e) {
            addError(e);
        }
    }

    private String getTopic(SpDataStream spDataStream) {
        return spDataStream.getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
    }

    private void increaseCounter(String str) {
        SpMonitoringManager.INSTANCE.increaseInCounter(getFunctionConfig().getFunctionId().getId(), str, System.currentTimeMillis());
    }

    private void addError(RuntimeException runtimeException) {
        SpMonitoringManager.INSTANCE.addErrorMessage(getFunctionConfig().getFunctionId().getId(), SpLogEntry.from(System.currentTimeMillis(), SpLogMessage.from(runtimeException)));
    }

    private void initializeProducers() {
        this.outputCollectors = getOutputCollectors();
        this.outputCollectors.forEach((str, spOutputCollector) -> {
            spOutputCollector.connect();
        });
    }

    private Map<String, SpOutputCollector> getOutputCollectors() {
        getFunctionConfig().getOutputDataStreams().forEach((str, spDataStream) -> {
            this.outputCollectors.put(str, ProtocolManager.makeOutputCollector(spDataStream.getEventGrounding().getTransportProtocol(), (TransportFormat) spDataStream.getEventGrounding().getTransportFormats().get(0), str));
        });
        return this.outputCollectors;
    }

    private Map<String, SpInputCollector> getInputCollectors(Collection<SpDataStream> collection) throws SpRuntimeException {
        HashMap hashMap = new HashMap();
        Environment environment = getEnvironment();
        for (SpDataStream spDataStream : collection) {
            if (((Boolean) environment.getSpDebug().getValueOrDefault()).booleanValue()) {
                GroundingDebugUtils.modifyGrounding(spDataStream.getEventGrounding());
            }
            hashMap.put(spDataStream.getElementId(), ProtocolManager.findInputCollector(spDataStream.getEventGrounding().getTransportProtocol(), (TransportFormat) spDataStream.getEventGrounding().getTransportFormats().get(0), false));
        }
        return hashMap;
    }

    private void registerConsumers() {
        this.inputCollectors.forEach((str, spInputCollector) -> {
            spInputCollector.registerConsumer(str, this);
            spInputCollector.connect();
        });
    }

    private void unregisterConsumers() {
        this.inputCollectors.forEach((str, spInputCollector) -> {
            spInputCollector.unregisterConsumer(str);
            spInputCollector.disconnect();
        });
    }

    private SourceInfo createSourceInfo(SpDataStream spDataStream, int i) {
        return new SourceInfo(spDataStream.getElementId(), "s" + i);
    }

    private SchemaInfo createSchemaInfo(EventSchema eventSchema) {
        return new SchemaInfo(eventSchema, new ArrayList());
    }

    private Environment getEnvironment() {
        return Environments.getEnvironment();
    }

    public abstract IFunctionConfig getFunctionConfig();

    public abstract void onServiceStarted(FunctionContext functionContext);

    public abstract void onEvent(Event event, String str);

    public abstract void onServiceStopped();
}
