package co.cask.cdap.etl.spark.streaming.function;

import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.etl.api.batch.BatchSink;
import co.cask.cdap.etl.common.DefaultMacroEvaluator;
import co.cask.cdap.etl.planner.StageInfo;
import co.cask.cdap.etl.spark.batch.SparkBatchSinkContext;
import co.cask.cdap.etl.spark.batch.SparkBatchSinkFactory;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.streaming.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/hydrator-spark-core-4.1.1.jar:co/cask/cdap/etl/spark/streaming/function/StreamingBatchSinkFunction.class */
public class StreamingBatchSinkFunction<T> implements Function2<JavaRDD<T>, Time, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingBatchSinkFunction.class);
    private final PairFlatMapFunction<T, Object, Object> sinkFunction;
    private final JavaSparkExecutionContext sec;
    private final StageInfo stageInfo;

    public StreamingBatchSinkFunction(PairFlatMapFunction<T, Object, Object> pairFlatMapFunction, JavaSparkExecutionContext javaSparkExecutionContext, StageInfo stageInfo) {
        this.sinkFunction = pairFlatMapFunction;
        this.sec = javaSparkExecutionContext;
        this.stageInfo = stageInfo;
    }

    public Void call(JavaRDD<T> javaRDD, Time time) throws Exception {
        final long milliseconds = time.milliseconds();
        DefaultMacroEvaluator defaultMacroEvaluator = new DefaultMacroEvaluator(this.sec.getWorkflowToken(), this.sec.getRuntimeArguments(), milliseconds, this.sec.getSecureStore(), this.sec.getNamespace());
        final SparkBatchSinkFactory sparkBatchSinkFactory = new SparkBatchSinkFactory();
        final String name = this.stageInfo.getName();
        final BatchSink batchSink = (BatchSink) this.sec.getPluginContext().newPluginInstance(name, defaultMacroEvaluator);
        boolean z = false;
        boolean z2 = false;
        try {
            try {
                this.sec.execute(new TxRunnable() { // from class: co.cask.cdap.etl.spark.streaming.function.StreamingBatchSinkFunction.1
                    @Override // co.cask.cdap.api.TxRunnable
                    public void run(DatasetContext datasetContext) throws Exception {
                        batchSink.prepareRun(new SparkBatchSinkContext(sparkBatchSinkFactory, StreamingBatchSinkFunction.this.sec, datasetContext, milliseconds, StreamingBatchSinkFunction.this.stageInfo, StreamingBatchSinkFunction.this.sec.getDataTracer(name).isEnabled()));
                    }
                });
                z = true;
                sparkBatchSinkFactory.writeFromRDD(javaRDD.flatMapToPair(this.sinkFunction), this.sec, name, Object.class, Object.class);
                z2 = true;
                this.sec.execute(new TxRunnable() { // from class: co.cask.cdap.etl.spark.streaming.function.StreamingBatchSinkFunction.2
                    @Override // co.cask.cdap.api.TxRunnable
                    public void run(DatasetContext datasetContext) throws Exception {
                        batchSink.onRunFinish(true, new SparkBatchSinkContext(sparkBatchSinkFactory, StreamingBatchSinkFunction.this.sec, datasetContext, milliseconds, StreamingBatchSinkFunction.this.stageInfo, StreamingBatchSinkFunction.this.sec.getDataTracer(name).isEnabled()));
                    }
                });
                if (1 == 0 || 1 != 0) {
                    return null;
                }
                this.sec.execute(new TxRunnable() { // from class: co.cask.cdap.etl.spark.streaming.function.StreamingBatchSinkFunction.3
                    @Override // co.cask.cdap.api.TxRunnable
                    public void run(DatasetContext datasetContext) throws Exception {
                        batchSink.onRunFinish(false, new SparkBatchSinkContext(sparkBatchSinkFactory, StreamingBatchSinkFunction.this.sec, datasetContext, milliseconds, StreamingBatchSinkFunction.this.stageInfo, StreamingBatchSinkFunction.this.sec.getDataTracer(name).isEnabled()));
                    }
                });
                return null;
            } catch (Exception e) {
                LOG.error("Error writing to sink {} for the batch for time {}.", name, Long.valueOf(milliseconds), e);
                if (!z || z2) {
                    return null;
                }
                this.sec.execute(new TxRunnable() { // from class: co.cask.cdap.etl.spark.streaming.function.StreamingBatchSinkFunction.3
                    @Override // co.cask.cdap.api.TxRunnable
                    public void run(DatasetContext datasetContext) throws Exception {
                        batchSink.onRunFinish(false, new SparkBatchSinkContext(sparkBatchSinkFactory, StreamingBatchSinkFunction.this.sec, datasetContext, milliseconds, StreamingBatchSinkFunction.this.stageInfo, StreamingBatchSinkFunction.this.sec.getDataTracer(name).isEnabled()));
                    }
                });
                return null;
            }
        } catch (Throwable th) {
            if (z && !z2) {
                this.sec.execute(new TxRunnable() { // from class: co.cask.cdap.etl.spark.streaming.function.StreamingBatchSinkFunction.3
                    @Override // co.cask.cdap.api.TxRunnable
                    public void run(DatasetContext datasetContext) throws Exception {
                        batchSink.onRunFinish(false, new SparkBatchSinkContext(sparkBatchSinkFactory, StreamingBatchSinkFunction.this.sec, datasetContext, milliseconds, StreamingBatchSinkFunction.this.stageInfo, StreamingBatchSinkFunction.this.sec.getDataTracer(name).isEnabled()));
                    }
                });
            }
            throw th;
        }
    }
}
