package co.cask.cdap.etl.spark.streaming.function;

import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.etl.api.Alert;
import co.cask.cdap.etl.api.AlertPublisher;
import co.cask.cdap.etl.api.AlertPublisherContext;
import co.cask.cdap.etl.common.BasicArguments;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.DefaultAlertPublisherContext;
import co.cask.cdap.etl.common.DefaultMacroEvaluator;
import co.cask.cdap.etl.common.DefaultStageMetrics;
import co.cask.cdap.etl.common.TrackedIterator;
import co.cask.cdap.etl.spark.SparkPipelineRuntime;
import co.cask.cdap.etl.spark.plugin.SparkPipelinePluginContext;
import co.cask.cdap.etl.spec.StageSpec;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/hydrator-spark-core2_2.11-5.1.1.jar:co/cask/cdap/etl/spark/streaming/function/StreamingAlertPublishFunction.class */
public class StreamingAlertPublishFunction implements Function2<JavaRDD<Alert>, Time, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingAlertPublishFunction.class);
    private final JavaSparkExecutionContext sec;
    private final StageSpec stageSpec;

    public StreamingAlertPublishFunction(JavaSparkExecutionContext javaSparkExecutionContext, StageSpec stageSpec) {
        this.sec = javaSparkExecutionContext;
        this.stageSpec = stageSpec;
    }

    public Void call(JavaRDD<Alert> javaRDD, Time time) throws Exception {
        DefaultMacroEvaluator defaultMacroEvaluator = new DefaultMacroEvaluator(new BasicArguments(this.sec), time.milliseconds(), this.sec.getSecureStore(), this.sec.getNamespace());
        SparkPipelinePluginContext sparkPipelinePluginContext = new SparkPipelinePluginContext(this.sec.getPluginContext(), this.sec.getMetrics(), this.stageSpec.isStageLoggingEnabled(), this.stageSpec.isProcessTimingEnabled());
        String name = this.stageSpec.getName();
        AlertPublisher alertPublisher = (AlertPublisher) sparkPipelinePluginContext.newPluginInstance(name, defaultMacroEvaluator);
        alertPublisher.initialize((AlertPublisherContext) new DefaultAlertPublisherContext(new SparkPipelineRuntime(this.sec, time.milliseconds()), this.stageSpec, this.sec.getMessagingContext(), this.sec.getAdmin()));
        alertPublisher.publish(new TrackedIterator(javaRDD.collect().iterator(), new DefaultStageMetrics(this.sec.getMetrics(), name), Constants.Metrics.RECORDS_IN));
        alertPublisher.destroy();
        return null;
    }
}
