package co.cask.cdap.test.app;

import co.cask.cdap.api.annotation.ProcessInput;
import co.cask.cdap.api.annotation.UseDataSet;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.batch.Input;
import co.cask.cdap.api.data.batch.Output;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.flow.AbstractFlow;
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.spark.AbstractSpark;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.api.spark.JavaSparkMain;
import co.cask.cdap.api.worker.AbstractWorker;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:co/cask/cdap/test/app/StreamAuthApp.class */
public class StreamAuthApp extends AbstractApplication {
    public static final String APP = "StreamWithMRApp";
    public static final String STREAM = "inputStream";
    public static final String STREAM2 = "inputStream2";
    public static final String MAPREDUCE = "MRCopy";
    public static final String KVTABLE = "kvtable";
    public static final String WORKER = "StreamWriter";
    public static final String FLOW = "FetchFlow";
    public static final String SPARK = "SparkCopy";

    /* loaded from: input_file:co/cask/cdap/test/app/StreamAuthApp$CopyMapReduce.class */
    public static class CopyMapReduce extends AbstractMapReduce {

        /* loaded from: input_file:co/cask/cdap/test/app/StreamAuthApp$CopyMapReduce$IdentityMapper.class */
        public static class IdentityMapper extends Mapper<LongWritable, String, byte[], byte[]> {
            public void map(LongWritable longWritable, String str, Mapper<LongWritable, String, byte[], byte[]>.Context context) throws IOException, InterruptedException {
                context.write(Bytes.toBytes(str), Bytes.toBytes(str));
            }

            public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
                map((LongWritable) obj, (String) obj2, (Mapper<LongWritable, String, byte[], byte[]>.Context) context);
            }
        }

        public void configure() {
            setName(StreamAuthApp.MAPREDUCE);
        }

        public void initialize() {
            MapReduceContext context = getContext();
            context.addInput(Input.ofStream(StreamAuthApp.STREAM));
            context.addOutput(Output.ofDataset(StreamAuthApp.KVTABLE));
            Job job = (Job) context.getHadoopJob();
            job.setMapperClass(IdentityMapper.class);
            job.setNumReduceTasks(0);
        }
    }

    /* loaded from: input_file:co/cask/cdap/test/app/StreamAuthApp$FetchFlow.class */
    public static class FetchFlow extends AbstractFlow {
        protected void configure() {
            super.configure();
            setName(StreamAuthApp.FLOW);
            addFlowlet(new StreamFetchFlowlet());
            connectStream(StreamAuthApp.STREAM, new StreamFetchFlowlet());
            connectStream(StreamAuthApp.STREAM2, new StreamFetchFlowlet());
        }
    }

    /* loaded from: input_file:co/cask/cdap/test/app/StreamAuthApp$SparkCopyProgram.class */
    public static class SparkCopyProgram implements JavaSparkMain {
        public void run(JavaSparkExecutionContext javaSparkExecutionContext) throws Exception {
            new JavaSparkContext();
            javaSparkExecutionContext.saveAsDataset(javaSparkExecutionContext.fromStream(StreamAuthApp.STREAM, String.class).mapToPair(new PairFunction<Tuple2<Long, String>, byte[], byte[]>() { // from class: co.cask.cdap.test.app.StreamAuthApp.SparkCopyProgram.1
                public Tuple2<byte[], byte[]> call(Tuple2<Long, String> tuple2) throws Exception {
                    return new Tuple2<>(Bytes.toBytes((String) tuple2._2()), Bytes.toBytes((String) tuple2._2()));
                }
            }), StreamAuthApp.KVTABLE);
        }
    }

    /* loaded from: input_file:co/cask/cdap/test/app/StreamAuthApp$SparkCopyProgramSpec.class */
    public static class SparkCopyProgramSpec extends AbstractSpark {
        protected void configure() {
            setName(StreamAuthApp.SPARK);
            setMainClass(SparkCopyProgram.class);
        }
    }

    /* loaded from: input_file:co/cask/cdap/test/app/StreamAuthApp$StreamFetchFlowlet.class */
    public static class StreamFetchFlowlet extends AbstractFlowlet {

        @UseDataSet(StreamAuthApp.KVTABLE)
        KeyValueTable kvTable;

        @ProcessInput(maxRetries = 3)
        public void process(StreamEvent streamEvent) {
            String bytes = Bytes.toString((ByteBuffer) streamEvent.getBody());
            this.kvTable.write(bytes, bytes);
        }
    }

    /* loaded from: input_file:co/cask/cdap/test/app/StreamAuthApp$StreamWriter.class */
    public static class StreamWriter extends AbstractWorker {
        public static final Logger LOG = LoggerFactory.getLogger(StreamWriter.class);

        protected void configure() {
            super.configure();
            setName(StreamAuthApp.WORKER);
        }

        public void run() {
            for (int i = 0; i < 5; i++) {
                try {
                    getContext().write(StreamAuthApp.STREAM, String.format("Hello%d", Integer.valueOf(i)));
                } catch (IOException e) {
                    LOG.debug("Error writing to Stream {}", StreamAuthApp.STREAM, e);
                }
            }
        }
    }

    public void configure() {
        setName(APP);
        setDescription("Copy Data from Stream to KVTable.");
        addStream(STREAM);
        addStream(STREAM2);
        createDataset(KVTABLE, KeyValueTable.class);
        addMapReduce(new CopyMapReduce());
        addWorker(new StreamWriter());
        addFlow(new FetchFlow());
        addSpark(new SparkCopyProgramSpec());
    }
}
