package co.cask.cdap.examples.sparkstreaming;

import co.cask.cdap.api.annotation.UseDataSet;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.schema.UnsupportedTypeException;
import co.cask.cdap.api.data.stream.Stream;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.lib.ObjectStore;
import co.cask.cdap.api.dataset.lib.ObjectStores;
import co.cask.cdap.api.service.http.AbstractHttpServiceHandler;
import co.cask.cdap.api.service.http.HttpServiceHandler;
import co.cask.cdap.api.service.http.HttpServiceRequest;
import co.cask.cdap.api.service.http.HttpServiceResponder;
import com.google.common.base.Charsets;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;

/* loaded from: input_file:co/cask/cdap/examples/sparkstreaming/SpamClassifier.class */
public class SpamClassifier extends AbstractApplication {
    static final String SERVICE_HANDLER = "MessageClassification";
    public static final String STREAM = "trainingDataStream";
    public static final String DATASET = "messageClassificationStore";

    /* loaded from: input_file:co/cask/cdap/examples/sparkstreaming/SpamClassifier$SpamClassifierServiceHandler.class */
    public static final class SpamClassifierServiceHandler extends AbstractHttpServiceHandler {
        static final String CLASSIFICATION_PATH = "classification";
        static final String SPAM = "Spam";
        static final String HAM = "Ham";

        @UseDataSet(SpamClassifier.DATASET)
        private ObjectStore<Double> messageClassificationStore;

        @GET
        @Path("classification/{message-id}")
        public void centers(HttpServiceRequest httpServiceRequest, HttpServiceResponder httpServiceResponder, @PathParam("message-id") String str) {
            Double d = (Double) this.messageClassificationStore.read(Bytes.toBytes(str));
            if (d == null) {
                httpServiceResponder.sendString(204, String.format("No message was found with message id: %s", str), Charsets.UTF_8);
            } else if (d.doubleValue() == 0.0d) {
                httpServiceResponder.sendString(200, HAM, Charsets.UTF_8);
            } else {
                httpServiceResponder.sendString(200, SPAM, Charsets.UTF_8);
            }
        }
    }

    public void configure() {
        setName("SpamClassifier");
        setDescription("A Spark Streaming Example for Kafka Message Classification");
        addStream(new Stream(STREAM));
        addSpark(new SpamClassifierProgram());
        addService(SERVICE_HANDLER, new SpamClassifierServiceHandler(), new HttpServiceHandler[0]);
        try {
            ObjectStores.createObjectStore(getConfigurer(), DATASET, Double.class, DatasetProperties.builder().setDescription("Kafka Message Spam Classification").build());
        } catch (UnsupportedTypeException e) {
            throw new RuntimeException((Throwable) e);
        }
    }
}
