package co.cask.cdap.examples.sparkkmeans;

import co.cask.cdap.api.annotation.ProcessInput;
import co.cask.cdap.api.annotation.UseDataSet;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.schema.UnsupportedTypeException;
import co.cask.cdap.api.data.stream.Stream;
import co.cask.cdap.api.dataset.lib.ObjectStore;
import co.cask.cdap.api.dataset.lib.ObjectStores;
import co.cask.cdap.api.flow.AbstractFlow;
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.service.AbstractService;
import co.cask.cdap.api.service.http.AbstractHttpServiceHandler;
import co.cask.cdap.api.service.http.HttpServiceRequest;
import co.cask.cdap.api.service.http.HttpServiceResponder;
import co.cask.cdap.api.spark.AbstractSpark;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import java.util.UUID;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/examples/sparkkmeans/SparkKMeansApp.class */
public class SparkKMeansApp extends AbstractApplication {

    /* loaded from: input_file:co/cask/cdap/examples/sparkkmeans/SparkKMeansApp$CentersService.class */
    public static final class CentersService extends AbstractService {
        public static final String SERVICE_NAME = "CentersService";

        protected void configure() {
            setName(SERVICE_NAME);
            setDescription("A service that responds with calculated center based on index parameter.");
            addHandler(new CentersServiceHandler());
        }
    }

    /* loaded from: input_file:co/cask/cdap/examples/sparkkmeans/SparkKMeansApp$CentersServiceHandler.class */
    public static final class CentersServiceHandler extends AbstractHttpServiceHandler {
        private static final Logger LOG = LoggerFactory.getLogger(CentersService.class);

        @UseDataSet("centers")
        private ObjectStore<String> store;

        @GET
        @Path("centers/{index}")
        public void centers(HttpServiceRequest httpServiceRequest, HttpServiceResponder httpServiceResponder, @PathParam("index") String str) {
            LOG.debug("Try to get centers for index: {}", str);
            String str2 = (String) this.store.read(str.getBytes());
            if (str2 == null) {
                LOG.debug("No centers found");
                httpServiceResponder.sendString(204, String.format("No centers found for index: %s", str), Charsets.UTF_8);
            } else {
                LOG.debug("Retrieved centers: {}", str2);
                httpServiceResponder.sendString(str2);
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/examples/sparkkmeans/SparkKMeansApp$PointsFlow.class */
    public static final class PointsFlow extends AbstractFlow {
        protected void configure() {
            setName("PointsFlow");
            setDescription("Reads points information and stores in dataset");
            addFlowlet("reader", new PointsReader());
            connectStream("pointsStream", "reader");
        }
    }

    /* loaded from: input_file:co/cask/cdap/examples/sparkkmeans/SparkKMeansApp$PointsReader.class */
    public static final class PointsReader extends AbstractFlowlet {
        private static final Logger LOG = LoggerFactory.getLogger(PointsReader.class);

        @UseDataSet("points")
        private ObjectStore<Point> pointsStore;

        @ProcessInput
        public void process(StreamEvent streamEvent) {
            LOG.trace("Points info: {}", Bytes.toString((ByteBuffer) streamEvent.getBody()));
            this.pointsStore.write(getIdAsByte(UUID.randomUUID()), parseEvent(streamEvent));
        }

        private byte[] getIdAsByte(UUID uuid) {
            ByteBuffer wrap = ByteBuffer.wrap(new byte[16]);
            wrap.putLong(uuid.getMostSignificantBits());
            wrap.putLong(uuid.getLeastSignificantBits());
            return wrap.array();
        }

        private Point parseEvent(StreamEvent streamEvent) {
            String[] split = Bytes.toString((ByteBuffer) streamEvent.getBody()).split(" ");
            Preconditions.checkArgument(split.length == 3);
            return new Point(Double.parseDouble(split[0]), Double.parseDouble(split[1]), Double.parseDouble(split[2]));
        }
    }

    /* loaded from: input_file:co/cask/cdap/examples/sparkkmeans/SparkKMeansApp$SparkKMeansSpecification.class */
    public static final class SparkKMeansSpecification extends AbstractSpark {
        public void configure() {
            setName("SparkKMeansProgram");
            setDescription("Spark KMeans Program");
            setMainClass(SparkKMeansProgram.class);
        }
    }

    public void configure() {
        setName("SparkKMeans");
        setDescription("Spark KMeans app");
        addStream(new Stream("pointsStream"));
        addFlow(new PointsFlow());
        addSpark(new SparkKMeansSpecification());
        addService(new CentersService());
        try {
            ObjectStores.createObjectStore(getConfigurer(), "points", Point.class);
            ObjectStores.createObjectStore(getConfigurer(), "centers", String.class);
        } catch (UnsupportedTypeException e) {
            throw new RuntimeException((Throwable) e);
        }
    }
}
