package co.cask.cdap.mapreduce.service;

import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.service.http.AbstractHttpServiceHandler;
import co.cask.cdap.api.service.http.HttpServiceHandler;
import co.cask.cdap.api.service.http.HttpServiceRequest;
import co.cask.cdap.api.service.http.HttpServiceResponder;
import co.cask.cdap.test.app.MyKeyValueTableDefinition;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.util.Iterator;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.QueryParam;
import org.apache.commons.io.Charsets;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;

/* loaded from: input_file:co/cask/cdap/mapreduce/service/TestMapReduceServiceIntegrationApp.class */
public class TestMapReduceServiceIntegrationApp extends AbstractApplication {
    public static final String COUNT_METHOD_NAME = "count";
    public static final String INPUT_DATASET = "words";
    public static final String MR_NAME = "WordCountMR";
    public static final String OUTPUT_DATASET = "totals";
    public static final String SERVICE_NAME = "WordsCount";
    public static final String SERVICE_URL = "WordsCountServiceURL";
    public static final String SQUARE_METHOD_NAME = "square";
    public static final String SQUARED_TOTAL_WORDS_COUNT = "squared_total_words_count";

    /* loaded from: input_file:co/cask/cdap/mapreduce/service/TestMapReduceServiceIntegrationApp$CountTotal.class */
    public static class CountTotal extends AbstractMapReduce {

        /* loaded from: input_file:co/cask/cdap/mapreduce/service/TestMapReduceServiceIntegrationApp$CountTotal$MyMapper.class */
        public static class MyMapper extends Mapper<String, String, BytesWritable, LongWritable> {
            private URL serviceUrl;

            protected void setup(Mapper<String, String, BytesWritable, LongWritable>.Context context) throws IOException, InterruptedException {
                this.serviceUrl = CountTotal.getServiceUrl(context);
            }

            protected void map(String str, String str2, Mapper<String, String, BytesWritable, LongWritable>.Context context) throws IOException, InterruptedException {
                context.write(new BytesWritable(Bytes.toBytes("total")), new LongWritable(Long.valueOf(CountTotal.doRequest(new URL(this.serviceUrl.toString() + TestMapReduceServiceIntegrationApp.COUNT_METHOD_NAME + "?words=" + URLEncoder.encode(str2, Charsets.UTF_8.name())))).longValue()));
            }

            protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
                map((String) obj, (String) obj2, (Mapper<String, String, BytesWritable, LongWritable>.Context) context);
            }
        }

        /* loaded from: input_file:co/cask/cdap/mapreduce/service/TestMapReduceServiceIntegrationApp$CountTotal$MyReducer.class */
        public static class MyReducer extends Reducer<BytesWritable, LongWritable, String, String> {
            private URL serviceUrl;

            protected void setup(Reducer.Context context) throws IOException, InterruptedException {
                this.serviceUrl = CountTotal.getServiceUrl(context);
            }

            protected void reduce(BytesWritable bytesWritable, Iterable<LongWritable> iterable, Reducer<BytesWritable, LongWritable, String, String>.Context context) throws IOException, InterruptedException {
                long j = 0;
                Iterator<LongWritable> it = iterable.iterator();
                while (it.hasNext()) {
                    j += it.next().get();
                }
                context.write(TestMapReduceServiceIntegrationApp.SQUARED_TOTAL_WORDS_COUNT, CountTotal.doRequest(new URL(this.serviceUrl.toString() + TestMapReduceServiceIntegrationApp.SQUARE_METHOD_NAME + "?num=" + j)));
            }

            protected /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
                reduce((BytesWritable) obj, (Iterable<LongWritable>) iterable, (Reducer<BytesWritable, LongWritable, String, String>.Context) context);
            }
        }

        public void configure() {
            setName(TestMapReduceServiceIntegrationApp.MR_NAME);
            setInputDataset(TestMapReduceServiceIntegrationApp.INPUT_DATASET);
            setOutputDataset(TestMapReduceServiceIntegrationApp.OUTPUT_DATASET);
        }

        public void beforeSubmit(MapReduceContext mapReduceContext) throws Exception {
            Job job = (Job) mapReduceContext.getHadoopJob();
            job.setMapperClass(MyMapper.class);
            job.setMapOutputKeyClass(BytesWritable.class);
            job.setMapOutputValueClass(LongWritable.class);
            job.setReducerClass(MyReducer.class);
            job.getConfiguration().set(TestMapReduceServiceIntegrationApp.SERVICE_URL, mapReduceContext.getServiceURL(TestMapReduceServiceIntegrationApp.SERVICE_NAME).toString());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static URL getServiceUrl(TaskInputOutputContext taskInputOutputContext) throws MalformedURLException {
            return new URL(taskInputOutputContext.getConfiguration().get(TestMapReduceServiceIntegrationApp.SERVICE_URL));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static String doRequest(URL url) throws IOException {
            HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
            InputStream inputStream = null;
            try {
                inputStream = httpURLConnection.getInputStream();
                String str = new String(ByteStreams.toByteArray(inputStream), Charsets.UTF_8);
                if (inputStream != null) {
                    inputStream.close();
                }
                httpURLConnection.disconnect();
                return str;
            } catch (Throwable th) {
                if (inputStream != null) {
                    inputStream.close();
                }
                httpURLConnection.disconnect();
                throw th;
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/mapreduce/service/TestMapReduceServiceIntegrationApp$WordsCountHandler.class */
    public class WordsCountHandler extends AbstractHttpServiceHandler {
        public WordsCountHandler() {
        }

        @GET
        @Path(TestMapReduceServiceIntegrationApp.COUNT_METHOD_NAME)
        public void count(HttpServiceRequest httpServiceRequest, HttpServiceResponder httpServiceResponder, @QueryParam("words") String str) {
            if (StringUtils.isEmpty(str)) {
                httpServiceResponder.sendStatus(400);
            } else {
                httpServiceResponder.sendString(200, Integer.toString(str.split(" ").length), Charsets.UTF_8);
            }
        }

        @GET
        @Path(TestMapReduceServiceIntegrationApp.SQUARE_METHOD_NAME)
        public void square(HttpServiceRequest httpServiceRequest, HttpServiceResponder httpServiceResponder, @QueryParam("num") Long l) {
            if (l == null) {
                httpServiceResponder.sendStatus(400);
            } else {
                httpServiceResponder.sendString(200, Long.toString(l.longValue() * l.longValue()), Charsets.UTF_8);
            }
        }
    }

    public void configure() {
        setName("MRServiceIntegration");
        addDatasetModule("my-kv", MyKeyValueTableDefinition.Module.class);
        createDataset(INPUT_DATASET, "myKeyValueTable", DatasetProperties.EMPTY);
        createDataset(OUTPUT_DATASET, "myKeyValueTable", DatasetProperties.EMPTY);
        addMapReduce(new CountTotal());
        addService(SERVICE_NAME, new WordsCountHandler(), new HttpServiceHandler[0]);
    }
}
