package co.cask.cdap.etl.spark.streaming.function;

import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.etl.api.batch.BatchSink;
import co.cask.cdap.etl.common.BasicArguments;
import co.cask.cdap.etl.common.DefaultMacroEvaluator;
import co.cask.cdap.etl.common.NoopStageStatisticsCollector;
import co.cask.cdap.etl.spark.Compat;
import co.cask.cdap.etl.spark.SparkPipelineRuntime;
import co.cask.cdap.etl.spark.batch.SparkBatchSinkContext;
import co.cask.cdap.etl.spark.batch.SparkBatchSinkFactory;
import co.cask.cdap.etl.spark.function.BatchSinkFunction;
import co.cask.cdap.etl.spark.function.PluginFunctionContext;
import co.cask.cdap.etl.spark.plugin.SparkPipelinePluginContext;
import co.cask.cdap.etl.spec.StageSpec;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.streaming.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:lib/cdap-etl-batch-5.1.0.jar:lib/hydrator-spark-core-5.1.0.jar:co/cask/cdap/etl/spark/streaming/function/StreamingBatchSinkFunction.class
 */
/* loaded from: input_file:lib/hydrator-spark-core-5.1.0.jar:co/cask/cdap/etl/spark/streaming/function/StreamingBatchSinkFunction.class */
public class StreamingBatchSinkFunction<T> implements Function2<JavaRDD<T>, Time, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingBatchSinkFunction.class);
    private final JavaSparkExecutionContext sec;
    private final StageSpec stageSpec;

    public StreamingBatchSinkFunction(PairFlatMapFunction<T, Object, Object> pairFlatMapFunction, JavaSparkExecutionContext javaSparkExecutionContext, StageSpec stageSpec) {
        this.sec = javaSparkExecutionContext;
        this.stageSpec = stageSpec;
    }

    public Void call(JavaRDD<T> javaRDD, Time time) throws Exception {
        if (javaRDD.isEmpty()) {
            return null;
        }
        long milliseconds = time.milliseconds();
        DefaultMacroEvaluator defaultMacroEvaluator = new DefaultMacroEvaluator(new BasicArguments(this.sec), milliseconds, this.sec.getSecureStore(), this.sec.getNamespace());
        SparkPipelinePluginContext sparkPipelinePluginContext = new SparkPipelinePluginContext(this.sec.getPluginContext(), this.sec.getMetrics(), this.stageSpec.isStageLoggingEnabled(), this.stageSpec.isProcessTimingEnabled());
        final SparkBatchSinkFactory sparkBatchSinkFactory = new SparkBatchSinkFactory();
        String name = this.stageSpec.getName();
        final BatchSink batchSink = (BatchSink) sparkPipelinePluginContext.newPluginInstance(name, defaultMacroEvaluator);
        final SparkPipelineRuntime sparkPipelineRuntime = new SparkPipelineRuntime(this.sec, milliseconds);
        boolean z = false;
        boolean z2 = false;
        try {
            try {
                this.sec.execute(new TxRunnable() { // from class: co.cask.cdap.etl.spark.streaming.function.StreamingBatchSinkFunction.1
                    public void run(DatasetContext datasetContext) throws Exception {
                        batchSink.prepareRun((BatchSink) new SparkBatchSinkContext(sparkBatchSinkFactory, StreamingBatchSinkFunction.this.sec, datasetContext, sparkPipelineRuntime, StreamingBatchSinkFunction.this.stageSpec));
                    }
                });
                z = true;
                sparkBatchSinkFactory.writeFromRDD(javaRDD.flatMapToPair(Compat.convert(new BatchSinkFunction(new PluginFunctionContext(this.stageSpec, this.sec, sparkPipelineRuntime.getArguments().asMap(), time.milliseconds(), new NoopStageStatisticsCollector())))), this.sec, name, Object.class, Object.class);
                z2 = true;
                this.sec.execute(new TxRunnable() { // from class: co.cask.cdap.etl.spark.streaming.function.StreamingBatchSinkFunction.2
                    public void run(DatasetContext datasetContext) throws Exception {
                        batchSink.onRunFinish(true, (boolean) new SparkBatchSinkContext(sparkBatchSinkFactory, StreamingBatchSinkFunction.this.sec, datasetContext, sparkPipelineRuntime, StreamingBatchSinkFunction.this.stageSpec));
                    }
                });
                if (1 == 0 || 1 != 0) {
                    return null;
                }
                this.sec.execute(new TxRunnable() { // from class: co.cask.cdap.etl.spark.streaming.function.StreamingBatchSinkFunction.3
                    public void run(DatasetContext datasetContext) throws Exception {
                        batchSink.onRunFinish(false, (boolean) new SparkBatchSinkContext(sparkBatchSinkFactory, StreamingBatchSinkFunction.this.sec, datasetContext, sparkPipelineRuntime, StreamingBatchSinkFunction.this.stageSpec));
                    }
                });
                return null;
            } catch (Exception e) {
                LOG.error("Error writing to sink {} for the batch for time {}.", new Object[]{name, Long.valueOf(milliseconds), e});
                if (!z || z2) {
                    return null;
                }
                this.sec.execute(new TxRunnable() { // from class: co.cask.cdap.etl.spark.streaming.function.StreamingBatchSinkFunction.3
                    public void run(DatasetContext datasetContext) throws Exception {
                        batchSink.onRunFinish(false, (boolean) new SparkBatchSinkContext(sparkBatchSinkFactory, StreamingBatchSinkFunction.this.sec, datasetContext, sparkPipelineRuntime, StreamingBatchSinkFunction.this.stageSpec));
                    }
                });
                return null;
            }
        } catch (Throwable th) {
            if (z && !z2) {
                this.sec.execute(new TxRunnable() { // from class: co.cask.cdap.etl.spark.streaming.function.StreamingBatchSinkFunction.3
                    public void run(DatasetContext datasetContext) throws Exception {
                        batchSink.onRunFinish(false, (boolean) new SparkBatchSinkContext(sparkBatchSinkFactory, StreamingBatchSinkFunction.this.sec, datasetContext, sparkPipelineRuntime, StreamingBatchSinkFunction.this.stageSpec));
                    }
                });
            }
            throw th;
        }
    }
}
