package org.apache.streampipes.wrapper.flink;

import java.util.Properties;
import java.util.UUID;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.TransportFormat;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.flink.converter.EventToMapConverter;
import org.apache.streampipes.wrapper.flink.serializer.ByteArraySerializer;
import org.apache.streampipes.wrapper.flink.sink.FlinkJmsProducer;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
import org.apache.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams;

/* loaded from: input_file:org/apache/streampipes/wrapper/flink/FlinkDataProcessorRuntime.class */
public abstract class FlinkDataProcessorRuntime<B extends EventProcessorBindingParams> extends FlinkRuntime<EventProcessorRuntimeParams<B>, B, DataProcessorInvocation, EventProcessorRuntimeContext> {
    private static final long serialVersionUID = 1;

    @Deprecated
    public FlinkDataProcessorRuntime(B b) {
        super(b);
    }

    public FlinkDataProcessorRuntime(B b, boolean z) {
        super(b, z);
    }

    public FlinkDataProcessorRuntime(B b, FlinkDeploymentConfig flinkDeploymentConfig) {
        super(b, flinkDeploymentConfig);
    }

    @Override // org.apache.streampipes.wrapper.flink.FlinkRuntime
    public void appendExecutionConfig(DataStream<Event>... dataStreamArr) {
        SingleOutputStreamOperator flatMap = getApplicationLogic(dataStreamArr).flatMap(new EventToMapConverter());
        EventGrounding eventGrounding = getOutputStream().getEventGrounding();
        ByteArraySerializer byteArraySerializer = new ByteArraySerializer(getDataFormatDefinition((TransportFormat) eventGrounding.getTransportFormats().get(0)));
        if (isKafkaProtocol(getOutputStream())) {
            flatMap.addSink(new FlinkKafkaProducer(getTopic(getOutputStream()), byteArraySerializer, getProducerProperties(eventGrounding.getTransportProtocol())));
        } else {
            flatMap.addSink(new FlinkJmsProducer(getJmsProtocol(getOutputStream()), byteArraySerializer));
        }
    }

    private SpDataStream getOutputStream() {
        return getGraph().getOutputStream();
    }

    protected abstract DataStream<Event> getApplicationLogic(DataStream<Event>... dataStreamArr);

    protected Properties getProperties(KafkaTransportProtocol kafkaTransportProtocol) {
        Properties properties = new Properties();
        String brokerHostname = kafkaTransportProtocol.getBrokerHostname();
        Integer valueOf = Integer.valueOf(kafkaTransportProtocol.getKafkaPort());
        properties.put("client.id", UUID.randomUUID().toString());
        properties.put("bootstrap.servers", brokerHostname + ":" + valueOf);
        return properties;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: makeRuntimeParams, reason: merged with bridge method [inline-methods] */
    public EventProcessorRuntimeParams<B> m0makeRuntimeParams() {
        return new EventProcessorRuntimeParams<>(this.bindingParams, false);
    }
}
