package co.cask.cdap.examples.purchase;

import co.cask.cdap.api.ProgramLifecycle;
import co.cask.cdap.api.Resources;
import co.cask.cdap.api.annotation.UseDataSet;
import co.cask.cdap.api.data.batch.Input;
import co.cask.cdap.api.data.batch.Output;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.common.http.HttpRequest;
import co.cask.common.http.HttpRequests;
import co.cask.common.http.HttpResponse;
import com.google.gson.Gson;
import java.io.IOException;
import java.net.URL;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/examples/purchase/PurchaseHistoryBuilder.class */
public class PurchaseHistoryBuilder extends AbstractMapReduce {
    public static final String MAPPER_MEMORY_MB = "mapper.memory.mb";
    public static final String REDUCER_MEMORY_MB = "reducer.memory.mb";

    /* loaded from: input_file:co/cask/cdap/examples/purchase/PurchaseHistoryBuilder$PerUserReducer.class */
    public static class PerUserReducer extends Reducer<Text, Purchase, String, PurchaseHistory> implements ProgramLifecycle<MapReduceContext> {

        @UseDataSet("frequentCustomers")
        private KeyValueTable frequentCustomers;
        private Metrics reduceMetrics;
        private URL userProfileServiceURL;
        private static final int RARE_PURCHASE_COUNT = 1;
        private static final int FREQUENT_PURCHASE_COUNT = 10;
        private static final Logger LOG = LoggerFactory.getLogger(PerUserReducer.class);

        public void initialize(MapReduceContext mapReduceContext) throws Exception {
            this.userProfileServiceURL = mapReduceContext.getServiceURL(UserProfileServiceHandler.SERVICE_NAME);
        }

        public void reduce(Text text, Iterable<Purchase> iterable, Reducer<Text, Purchase, String, PurchaseHistory>.Context context) throws IOException, InterruptedException {
            UserProfile userProfile = null;
            try {
                HttpResponse execute = HttpRequests.execute(HttpRequest.get(new URL(this.userProfileServiceURL, "user/" + text.toString())).build());
                if (execute.getResponseCode() != 204) {
                    userProfile = (UserProfile) new Gson().fromJson(execute.getResponseBodyAsString(), UserProfile.class);
                }
            } catch (Exception e) {
                LOG.warn("Error accessing user profile.", e);
            }
            PurchaseHistory purchaseHistory = new PurchaseHistory(text.toString(), userProfile);
            int i = 0;
            Iterator<Purchase> it = iterable.iterator();
            while (it.hasNext()) {
                purchaseHistory.add(new Purchase(it.next()));
                i++;
            }
            if (i == 1) {
                this.reduceMetrics.count("customers.rare", 1);
            } else if (i > 10) {
                this.reduceMetrics.count("customers.frequent", 1);
                this.frequentCustomers.write(text.toString(), String.valueOf(i));
            }
            context.write(text.toString(), purchaseHistory);
        }

        public void destroy() {
        }

        public /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
            reduce((Text) obj, (Iterable<Purchase>) iterable, (Reducer<Text, Purchase, String, PurchaseHistory>.Context) context);
        }
    }

    /* loaded from: input_file:co/cask/cdap/examples/purchase/PurchaseHistoryBuilder$PurchaseMapper.class */
    public static class PurchaseMapper extends Mapper<byte[], Purchase, Text, Purchase> {
        private Metrics mapMetrics;

        public void map(byte[] bArr, Purchase purchase, Mapper<byte[], Purchase, Text, Purchase>.Context context) throws IOException, InterruptedException {
            String customer = purchase.getCustomer();
            if (purchase.getPrice() > 100000) {
                this.mapMetrics.count("purchases.large", 1);
            }
            context.write(new Text(customer), purchase);
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((byte[]) obj, (Purchase) obj2, (Mapper<byte[], Purchase, Text, Purchase>.Context) context);
        }
    }

    public void configure() {
        setDescription("Purchase History Builder");
        setDriverResources(new Resources(1024));
        setMapperResources(new Resources(1024));
        setReducerResources(new Resources(1024));
    }

    public void initialize() throws Exception {
        MapReduceContext context = getContext();
        ((Job) context.getHadoopJob()).setReducerClass(PerUserReducer.class);
        context.addInput(Input.ofDataset("purchases"), PurchaseMapper.class);
        context.addOutput(Output.ofDataset("history"));
        Map runtimeArguments = context.getRuntimeArguments();
        String str = (String) runtimeArguments.get(MAPPER_MEMORY_MB);
        if (str != null) {
            context.setMapperResources(new Resources(Integer.parseInt(str)));
        }
        String str2 = (String) runtimeArguments.get(REDUCER_MEMORY_MB);
        if (str2 != null) {
            context.setReducerResources(new Resources(Integer.parseInt(str2)));
        }
    }
}
