package co.cask.cdap.spark.app;

import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.dataset.table.Get;
import co.cask.cdap.api.dataset.table.Increment;
import co.cask.cdap.api.dataset.table.Put;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.spark.AbstractSpark;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.api.spark.JavaSparkMain;
import co.cask.cdap.api.spark.SparkClientContext;
import com.google.common.base.Preconditions;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

/* loaded from: input_file:co/cask/cdap/spark/app/CharCountProgram.class */
public class CharCountProgram extends AbstractSpark implements JavaSparkMain {
    protected void configure() {
        setMainClass(CharCountProgram.class);
    }

    public void initialize() throws Exception {
        SparkClientContext context = getContext();
        context.setSparkConf(new SparkConf().set("spark.io.compression.codec", "org.apache.spark.io.LZFCompressionCodec"));
        Table dataset = context.getDataset("totals");
        dataset.get((Get) new Get("total").add("total", new String[0])).getLong("total");
        dataset.put(new Put("total").add("total", 0L));
    }

    public void run(final JavaSparkExecutionContext javaSparkExecutionContext) throws Exception {
        Preconditions.checkArgument("org.apache.spark.io.LZFCompressionCodec".equals(new JavaSparkContext().getConf().get("spark.io.compression.codec")));
        final JavaPairRDD mapToPair = javaSparkExecutionContext.fromDataset("keys").mapToPair(new PairFunction<Tuple2<byte[], String>, byte[], byte[]>() { // from class: co.cask.cdap.spark.app.CharCountProgram.1
            public Tuple2<byte[], byte[]> call(Tuple2<byte[], String> tuple2) throws Exception {
                return new Tuple2<>(tuple2._1(), Bytes.toBytes(((String) tuple2._2()).length()));
            }
        });
        javaSparkExecutionContext.execute(new TxRunnable() { // from class: co.cask.cdap.spark.app.CharCountProgram.2
            public void run(DatasetContext datasetContext) throws Exception {
                datasetContext.getDataset("totals").increment(new Increment("total").add("total", mapToPair.count()));
                javaSparkExecutionContext.saveAsDataset(mapToPair, "count");
            }
        });
    }
}
