package co.cask.cdap.test.app;

import co.cask.cdap.api.annotation.Output;
import co.cask.cdap.api.annotation.ProcessInput;
import co.cask.cdap.api.annotation.UseDataSet;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.stream.Stream;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.flow.AbstractFlow;
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.flow.flowlet.Callback;
import co.cask.cdap.api.flow.flowlet.FailurePolicy;
import co.cask.cdap.api.flow.flowlet.FailureReason;
import co.cask.cdap.api.flow.flowlet.InputContext;
import co.cask.cdap.api.flow.flowlet.OutputEmitter;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.service.BasicService;
import co.cask.cdap.api.service.http.AbstractHttpServiceHandler;
import co.cask.cdap.api.service.http.HttpServiceHandler;
import co.cask.cdap.api.service.http.HttpServiceRequest;
import co.cask.cdap.api.service.http.HttpServiceResponder;
import co.cask.cdap.mapreduce.service.TestMapReduceServiceIntegrationApp;
import co.cask.cdap.test.app.MyKeyValueTableDefinition;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.Iterator;
import java.util.Map;
import java.util.StringTokenizer;
import javax.annotation.Nullable;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

/* loaded from: input_file:co/cask/cdap/test/app/WordCountApp.class */
public class WordCountApp extends AbstractApplication {

    /* loaded from: input_file:co/cask/cdap/test/app/WordCountApp$CountByField.class */
    public static class CountByField extends AbstractFlowlet implements Callback {

        @UseDataSet("mydataset")
        private MyKeyValueTableDefinition.KeyValueTable counters;

        @ProcessInput({"field"})
        public void process(Map<String, String> map) {
            String str = map.get("word");
            if (str == null) {
                return;
            }
            String str2 = map.get("field");
            if (str2 != null) {
                str = str2 + ":" + str;
            }
            this.counters.put(str, String.valueOf(Long.valueOf(this.counters.get(str, "0")).longValue() + 1));
        }

        public void onSuccess(@Nullable Object obj, @Nullable InputContext inputContext) {
        }

        public FailurePolicy onFailure(@Nullable Object obj, @Nullable InputContext inputContext, FailureReason failureReason) {
            return FailurePolicy.RETRY;
        }
    }

    /* loaded from: input_file:co/cask/cdap/test/app/WordCountApp$CountFromStream.class */
    public static final class CountFromStream extends AbstractMapReduce {

        /* loaded from: input_file:co/cask/cdap/test/app/WordCountApp$CountFromStream$StreamMapper.class */
        public static final class StreamMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
            private static final Text TOTAL = new Text("total");

            protected void map(LongWritable longWritable, Text text, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
                StringTokenizer stringTokenizer = new StringTokenizer(text.toString());
                long j = 0;
                while (stringTokenizer.hasMoreTokens()) {
                    j++;
                    stringTokenizer.nextToken();
                }
                context.write(TOTAL, new LongWritable(j));
            }

            protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
                map((LongWritable) obj, (Text) obj2, (Mapper<LongWritable, Text, Text, LongWritable>.Context) context);
            }
        }

        /* loaded from: input_file:co/cask/cdap/test/app/WordCountApp$CountFromStream$StreamReducer.class */
        public static final class StreamReducer extends Reducer<Text, LongWritable, String, String> {
            protected void reduce(Text text, Iterable<LongWritable> iterable, Reducer<Text, LongWritable, String, String>.Context context) throws IOException, InterruptedException {
                long j = 0;
                Iterator<LongWritable> it = iterable.iterator();
                while (it.hasNext()) {
                    j += it.next().get();
                }
                context.write("stream_total_words_count", String.valueOf(j));
            }

            protected /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
                reduce((Text) obj, (Iterable<LongWritable>) iterable, (Reducer<Text, LongWritable, String, String>.Context) context);
            }
        }

        public void configure() {
            setName("countFromStream");
            useStreamInput("text");
            setOutputDataset(TestMapReduceServiceIntegrationApp.OUTPUT_DATASET);
        }

        public void beforeSubmit(MapReduceContext mapReduceContext) throws Exception {
            Job job = (Job) mapReduceContext.getHadoopJob();
            job.setMapperClass(StreamMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            job.setReducerClass(StreamReducer.class);
        }
    }

    /* loaded from: input_file:co/cask/cdap/test/app/WordCountApp$CountTotal.class */
    public static class CountTotal extends AbstractMapReduce {

        /* loaded from: input_file:co/cask/cdap/test/app/WordCountApp$CountTotal$MyMapper.class */
        public static class MyMapper extends Mapper<String, String, BytesWritable, LongWritable> {
            protected void map(String str, String str2, Mapper<String, String, BytesWritable, LongWritable>.Context context) throws IOException, InterruptedException {
                context.write(new BytesWritable(Bytes.toBytes("total")), new LongWritable(Long.valueOf(str2).longValue()));
            }

            protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
                map((String) obj, (String) obj2, (Mapper<String, String, BytesWritable, LongWritable>.Context) context);
            }
        }

        /* loaded from: input_file:co/cask/cdap/test/app/WordCountApp$CountTotal$MyReducer.class */
        public static class MyReducer extends Reducer<BytesWritable, LongWritable, String, String> {
            protected void reduce(BytesWritable bytesWritable, Iterable<LongWritable> iterable, Reducer<BytesWritable, LongWritable, String, String>.Context context) throws IOException, InterruptedException {
                long j = 0;
                Iterator<LongWritable> it = iterable.iterator();
                while (it.hasNext()) {
                    j += it.next().get();
                }
                context.write("total_words_count", String.valueOf(j));
            }

            protected /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
                reduce((BytesWritable) obj, (Iterable<LongWritable>) iterable, (Reducer<BytesWritable, LongWritable, String, String>.Context) context);
            }
        }

        public void configure() {
            setName("countTotal");
            setInputDataset("mydataset");
            setOutputDataset(TestMapReduceServiceIntegrationApp.OUTPUT_DATASET);
        }

        public void beforeSubmit(MapReduceContext mapReduceContext) throws Exception {
            Job job = (Job) mapReduceContext.getHadoopJob();
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
        }
    }

    /* loaded from: input_file:co/cask/cdap/test/app/WordCountApp$MyRecord.class */
    public static final class MyRecord {
        private final String title;
        private final String text;
        private final boolean expired;

        public MyRecord(String str, String str2, boolean z) {
            this.title = str;
            this.text = str2;
            this.expired = z;
        }

        public String getTitle() {
            return this.title;
        }

        public String getText() {
            return this.text;
        }

        public boolean isExpired() {
            return this.expired;
        }

        public String toString() {
            return "MyRecord{title='" + this.title + "', text='" + this.text + "', expired=" + this.expired + '}';
        }
    }

    /* loaded from: input_file:co/cask/cdap/test/app/WordCountApp$StreamSource.class */
    public static final class StreamSource extends AbstractFlowlet {
        private OutputEmitter<MyRecord> output;
        private Metrics metrics;

        @ProcessInput
        public void process(StreamEvent streamEvent, InputContext inputContext) throws CharacterCodingException {
            if ("text".equals(inputContext.getOrigin())) {
                this.metrics.count("stream.event", 1);
                ByteBuffer byteBuffer = (ByteBuffer) streamEvent.getBody();
                this.output.emit(new MyRecord((String) streamEvent.getHeaders().get("title"), byteBuffer == null ? null : Charsets.UTF_8.newDecoder().decode(byteBuffer).toString(), false));
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/test/app/WordCountApp$Tokenizer.class */
    public static class Tokenizer extends AbstractFlowlet {

        @Output("field")
        private OutputEmitter<Map<String, String>> outputMap;
        private boolean error = true;

        @ProcessInput
        public void foo(MyRecord myRecord) {
            tokenize(myRecord.getTitle(), "title");
            tokenize(myRecord.getText(), "text");
            if (this.error) {
                this.error = false;
                throw new IllegalStateException(myRecord.toString());
            }
        }

        private void tokenize(String str, String str2) {
            if (str == null) {
                return;
            }
            for (String str3 : str.split("[ .-]")) {
                this.outputMap.emit(ImmutableMap.of("field", str2, "word", str3));
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/test/app/WordCountApp$WordCountFlow.class */
    public static class WordCountFlow extends AbstractFlow {
        protected void configureFlow() {
            setName("WordCountFlow");
            setDescription("Flow for counting words");
            addFlowlet(new StreamSource());
            addFlowlet(new Tokenizer());
            addFlowlet(new CountByField());
            connectStream("text", "StreamSource");
            connect("StreamSource", "Tokenizer");
            connect("Tokenizer", "CountByField");
        }
    }

    /* loaded from: input_file:co/cask/cdap/test/app/WordCountApp$WordFrequencyHandler.class */
    public static class WordFrequencyHandler extends AbstractHttpServiceHandler {

        @UseDataSet("mydataset")
        private MyKeyValueTableDefinition.KeyValueTable counters;

        @UseDataSet(TestMapReduceServiceIntegrationApp.OUTPUT_DATASET)
        private MyKeyValueTableDefinition.KeyValueTable totals;

        @GET
        @Path("wordfreq/{word}")
        public void wordfreq(HttpServiceRequest httpServiceRequest, HttpServiceResponder httpServiceResponder, @PathParam("word") String str) throws IOException {
            httpServiceResponder.sendJson(ImmutableMap.of(str, Long.valueOf(this.counters.get(str, "0"))));
        }

        @GET
        @Path("total")
        public void total(HttpServiceRequest httpServiceRequest, HttpServiceResponder httpServiceResponder) throws IOException {
            httpServiceResponder.sendJson(Long.valueOf(Long.valueOf(this.totals.get("total_words_count")).longValue()));
        }

        @GET
        @Path("stream_total")
        public void streamTotal(HttpServiceRequest httpServiceRequest, HttpServiceResponder httpServiceResponder) throws IOException {
            httpServiceResponder.sendJson(Long.valueOf(Long.valueOf(this.totals.get("stream_total_words_count")).longValue()));
        }
    }

    public void configure() {
        setName("WordCountApp");
        addStream(new Stream("text"));
        addDatasetModule("my-kv", MyKeyValueTableDefinition.Module.class);
        createDataset("mydataset", "myKeyValueTable", DatasetProperties.EMPTY);
        createDataset(TestMapReduceServiceIntegrationApp.OUTPUT_DATASET, "myKeyValueTable", DatasetProperties.EMPTY);
        addFlow(new WordCountFlow());
        addService(new BasicService("WordFrequency", new WordFrequencyHandler(), new HttpServiceHandler[0]));
        addMapReduce(new CountTotal());
        addMapReduce(new CountFromStream());
    }
}
