package co.cask.cdap.datastreams;

import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.lib.FileSet;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.api.spark.JavaSparkMain;
import co.cask.cdap.etl.api.ErrorTransform;
import co.cask.cdap.etl.api.Transform;
import co.cask.cdap.etl.api.batch.BatchAggregator;
import co.cask.cdap.etl.api.batch.BatchJoiner;
import co.cask.cdap.etl.api.batch.BatchSink;
import co.cask.cdap.etl.api.batch.SparkCompute;
import co.cask.cdap.etl.api.streaming.StreamingSource;
import co.cask.cdap.etl.api.streaming.Windower;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.PipelinePhase;
import co.cask.cdap.etl.planner.StageInfo;
import co.cask.cdap.etl.spec.StageSpec;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import com.google.common.collect.ImmutableSet;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.tephra.Transaction;
import org.apache.twill.filesystem.Location;

/* loaded from: input_file:co/cask/cdap/datastreams/SparkStreamingPipelineDriver.class */
public class SparkStreamingPipelineDriver implements JavaSparkMain {
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).create();
    private static final Set<String> SUPPORTED_PLUGIN_TYPES = ImmutableSet.of(StreamingSource.PLUGIN_TYPE, BatchSink.PLUGIN_TYPE, Transform.PLUGIN_TYPE, BatchAggregator.PLUGIN_TYPE, BatchJoiner.PLUGIN_TYPE, SparkCompute.PLUGIN_TYPE, Windower.PLUGIN_TYPE, ErrorTransform.PLUGIN_TYPE);

    @Override // co.cask.cdap.api.spark.JavaSparkMain
    public void run(JavaSparkExecutionContext javaSparkExecutionContext) throws Exception {
        DataStreamsPipelineSpec dataStreamsPipelineSpec = (DataStreamsPipelineSpec) GSON.fromJson(javaSparkExecutionContext.getSpecification().getProperty(Constants.PIPELINEID), DataStreamsPipelineSpec.class);
        PipelinePhase.Builder addConnections = PipelinePhase.builder(SUPPORTED_PLUGIN_TYPES).addConnections(dataStreamsPipelineSpec.getConnections());
        for (StageSpec stageSpec : dataStreamsPipelineSpec.getStages()) {
            addConnections.addStage(StageInfo.builder(stageSpec.getName(), stageSpec.getPlugin().getType()).addInputs(stageSpec.getInputs()).addOutputs(stageSpec.getOutputs()).addInputSchemas(stageSpec.getInputSchemas()).setOutputSchema(stageSpec.getOutputSchema()).setErrorSchema(stageSpec.getErrorSchema()).build());
        }
        PipelinePhase build = addConnections.build();
        String str = null;
        if (!Boolean.parseBoolean(javaSparkExecutionContext.getSpecification().getProperty("hydrator.checkpoints.disabled"))) {
            String name = javaSparkExecutionContext.getApplicationSpecification().getName();
            String property = javaSparkExecutionContext.getSpecification().getProperty("hydrator.checkpoint.dir");
            final AtomicReference atomicReference = new AtomicReference();
            javaSparkExecutionContext.execute(new TxRunnable() { // from class: co.cask.cdap.datastreams.SparkStreamingPipelineDriver.1
                @Override // co.cask.cdap.api.TxRunnable
                public void run(DatasetContext datasetContext) throws Exception {
                    atomicReference.set(((FileSet) datasetContext.getDataset(DataStreamsApp.CHECKPOINT_FILESET)).getBaseLocation());
                }
            });
            str = ((Location) atomicReference.get()).append(name).append(property).toURI().toString();
        }
        JavaStreamingContext run = run(dataStreamsPipelineSpec, build, javaSparkExecutionContext, str);
        run.start();
        boolean z = false;
        try {
            z = run.awaitTerminationOrTimeout(Transaction.NO_TX_IN_PROGRESS);
            if (z) {
                return;
            }
            run.stop(true, dataStreamsPipelineSpec.isStopGracefully());
        } catch (Throwable th) {
            if (!z) {
                run.stop(true, dataStreamsPipelineSpec.isStopGracefully());
            }
            throw th;
        }
    }

    private JavaStreamingContext run(final DataStreamsPipelineSpec dataStreamsPipelineSpec, final PipelinePhase pipelinePhase, final JavaSparkExecutionContext javaSparkExecutionContext, @Nullable final String str) throws Exception {
        return JavaStreamingContext.getOrCreate(str, new JavaStreamingContextFactory() { // from class: co.cask.cdap.datastreams.SparkStreamingPipelineDriver.2
            public JavaStreamingContext create() {
                JavaStreamingContext javaStreamingContext = new JavaStreamingContext(new JavaSparkContext(), Durations.milliseconds(dataStreamsPipelineSpec.getBatchIntervalMillis()));
                try {
                    new SparkStreamingPipelineRunner(javaSparkExecutionContext, javaStreamingContext, false, dataStreamsPipelineSpec.getNumOfRecordsPreview()).runPipeline(pipelinePhase, StreamingSource.PLUGIN_TYPE, javaSparkExecutionContext, new HashMap());
                    if (str != null) {
                        javaStreamingContext.checkpoint(str);
                    }
                    return javaStreamingContext;
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        });
    }
}
