package co.cask.cdap.explore.executor;

import co.cask.cdap.api.data.batch.RecordScannable;
import co.cask.cdap.api.data.batch.RecordWritable;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.data.schema.UnsupportedTypeException;
import co.cask.cdap.api.dataset.Dataset;
import co.cask.cdap.api.dataset.DatasetDefinition;
import co.cask.cdap.api.dataset.DatasetSpecification;
import co.cask.cdap.api.dataset.lib.FileSet;
import co.cask.cdap.api.dataset.lib.FileSetProperties;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSet;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import co.cask.cdap.explore.schema.SchemaConverter;
import co.cask.cdap.explore.service.ExploreService;
import co.cask.cdap.explore.service.TableNotFoundException;
import co.cask.cdap.hive.objectinspector.ObjectInspectorFactory;
import co.cask.cdap.internal.io.ReflectionSchemaGenerator;
import co.cask.cdap.proto.QueryHandle;
import co.cask.http.AbstractHttpHandler;
import co.cask.http.HttpResponder;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.inject.Inject;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Map;
import javax.ws.rs.DELETE;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.twill.filesystem.Location;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path("/v2/data/explore")
/* loaded from: input_file:co/cask/cdap/explore/executor/ExploreExecutorHttpHandler.class */
public class ExploreExecutorHttpHandler extends AbstractHttpHandler {
    private static final Logger LOG = LoggerFactory.getLogger(QueryExecutorHttpHandler.class);
    private static final Gson GSON = new Gson();
    private final ExploreService exploreService;
    private final DatasetFramework datasetFramework;
    private final StreamAdmin streamAdmin;

    @Inject
    public ExploreExecutorHttpHandler(ExploreService exploreService, DatasetFramework datasetFramework, StreamAdmin streamAdmin) {
        this.exploreService = exploreService;
        this.datasetFramework = datasetFramework;
        this.streamAdmin = streamAdmin;
    }

    @POST
    @Path("streams/{stream}/enable")
    public void enableStream(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("stream") String str) {
        try {
            try {
                StreamConfig config = this.streamAdmin.getConfig(str);
                Location location = config.getLocation();
                if (location == null) {
                    httpResponder.sendString(HttpResponseStatus.NOT_FOUND, "Could not find location of stream " + str);
                    return;
                }
                String uri = location.toURI().toString();
                LOG.debug("Enabling explore for stream {} at location {}", str, uri);
                try {
                    String generateStreamCreateStatement = generateStreamCreateStatement(str, uri, config.getFormat().getSchema());
                    LOG.debug("Running create statement for stream {}", str);
                    QueryHandle execute = this.exploreService.execute(generateStreamCreateStatement);
                    JsonObject jsonObject = new JsonObject();
                    jsonObject.addProperty("handle", execute.getHandle());
                    httpResponder.sendJson(HttpResponseStatus.OK, jsonObject);
                } catch (UnsupportedTypeException e) {
                    LOG.error("Exception while generating create statement for stream {}", str, e);
                    httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, e.getMessage());
                }
            } catch (IOException e2) {
                LOG.info("Could not find stream {} to enable explore on.", str, e2);
                httpResponder.sendString(HttpResponseStatus.NOT_FOUND, "Could not find stream " + str);
            }
        } catch (Throwable th) {
            LOG.error("Got exception:", th);
            httpResponder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, th.getMessage());
        }
    }

    @POST
    @Path("streams/{stream}/disable")
    public void disableStream(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("stream") String str) {
        try {
            LOG.debug("Disabling explore for stream {}", str);
            try {
                this.streamAdmin.getConfig(str);
                String generateDeleteStatement = generateDeleteStatement(getStreamTableName(str));
                LOG.debug("Running delete statement for stream {} - {}", str, generateDeleteStatement);
                QueryHandle execute = this.exploreService.execute(generateDeleteStatement);
                JsonObject jsonObject = new JsonObject();
                jsonObject.addProperty("handle", execute.getHandle());
                httpResponder.sendJson(HttpResponseStatus.OK, jsonObject);
            } catch (IOException e) {
                LOG.debug("Could not find stream {} to disable explore on.", str, e);
                httpResponder.sendString(HttpResponseStatus.NOT_FOUND, "Could not find stream " + str);
            }
        } catch (Throwable th) {
            LOG.error("Got exception:", th);
            httpResponder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, th.getMessage());
        }
    }

    @POST
    @Path("/datasets/{dataset}/enable")
    public void enableDataset(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("dataset") String str) {
        DatasetSpecification datasetSpec;
        try {
            try {
                Dataset dataset = this.datasetFramework.getDataset(str, DatasetDefinition.NO_ARGUMENTS, (ClassLoader) null);
                if (dataset == null) {
                    httpResponder.sendString(HttpResponseStatus.NOT_FOUND, "Cannot load dataset " + str);
                    return;
                }
                String str2 = null;
                try {
                    if ((dataset instanceof RecordScannable) || (dataset instanceof RecordWritable)) {
                        LOG.debug("Enabling explore for dataset instance {}", str);
                        str2 = generateCreateStatement(str, dataset);
                    } else if (((dataset instanceof FileSet) || (dataset instanceof TimePartitionedFileSet)) && (datasetSpec = this.datasetFramework.getDatasetSpec(str)) != null) {
                        Map properties = datasetSpec.getProperties();
                        if (FileSetProperties.isExploreEnabled(properties)) {
                            LOG.debug("Enabling explore for dataset instance {}", str);
                            str2 = generateFileSetCreateStatement(str, dataset, properties);
                        }
                    }
                    if (str2 == null) {
                        LOG.debug("Dataset {} does not fulfill the criteria to enable explore.", str);
                        JsonObject jsonObject = new JsonObject();
                        jsonObject.addProperty("handle", QueryHandle.NO_OP.getHandle());
                        httpResponder.sendJson(HttpResponseStatus.OK, jsonObject);
                        return;
                    }
                    LOG.debug("Running create statement for dataset {} with class {} - {}", new Object[]{str, dataset.getClass().getName(), str2});
                    QueryHandle execute = this.exploreService.execute(str2);
                    JsonObject jsonObject2 = new JsonObject();
                    jsonObject2.addProperty("handle", execute.getHandle());
                    httpResponder.sendJson(HttpResponseStatus.OK, jsonObject2);
                } catch (Exception e) {
                    LOG.error("Exception while generating create statement for dataset {}", str, e);
                    httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, e.getMessage());
                }
            } catch (Exception e2) {
                String isClassNotFoundException = isClassNotFoundException(e2);
                if (isClassNotFoundException == null) {
                    throw e2;
                }
                LOG.info("Cannot load dataset {} because class {} cannot be found. This is probably because class {} is a type parameter of dataset {} that is not present in the dataset's jar file. See the developer guide for more information.", new Object[]{str, isClassNotFoundException, isClassNotFoundException, str});
                JsonObject jsonObject3 = new JsonObject();
                jsonObject3.addProperty("handle", QueryHandle.NO_OP.getHandle());
                httpResponder.sendJson(HttpResponseStatus.OK, jsonObject3);
            }
        } catch (Throwable th) {
            LOG.error("Got exception:", th);
            httpResponder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, th.getMessage());
        }
    }

    private String isClassNotFoundException(Throwable th) {
        if (th instanceof ClassNotFoundException) {
            return th.getMessage();
        }
        if (th.getCause() != null) {
            return isClassNotFoundException(th.getCause());
        }
        return null;
    }

    @POST
    @Path("/datasets/{dataset}/disable")
    public void disableDataset(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("dataset") String str) {
        DatasetSpecification datasetSpec;
        try {
            LOG.debug("Disabling explore for dataset instance {}", str);
            try {
                Dataset dataset = this.datasetFramework.getDataset(str, DatasetDefinition.NO_ARGUMENTS, (ClassLoader) null);
                if (dataset == null) {
                    httpResponder.sendString(HttpResponseStatus.NOT_FOUND, "Cannot load dataset " + str);
                    return;
                }
                String str2 = null;
                if ((dataset instanceof RecordScannable) || (dataset instanceof RecordWritable)) {
                    str2 = generateDeleteStatement(str);
                } else if (((dataset instanceof FileSet) || (dataset instanceof TimePartitionedFileSet)) && (datasetSpec = this.datasetFramework.getDatasetSpec(str)) != null && FileSetProperties.isExploreEnabled(datasetSpec.getProperties())) {
                    str2 = generateDeleteStatement(str);
                }
                if (str2 == null) {
                    LOG.debug("Dataset {} does not fulfill the criteria to enable explore.", str);
                    JsonObject jsonObject = new JsonObject();
                    jsonObject.addProperty("handle", QueryHandle.NO_OP.getHandle());
                    httpResponder.sendJson(HttpResponseStatus.OK, jsonObject);
                    return;
                }
                try {
                    this.exploreService.getTableInfo(null, getHiveTableName(str));
                    LOG.debug("Running delete statement for dataset {} - {}", str, str2);
                    QueryHandle execute = this.exploreService.execute(str2);
                    JsonObject jsonObject2 = new JsonObject();
                    jsonObject2.addProperty("handle", execute.getHandle());
                    httpResponder.sendJson(HttpResponseStatus.OK, jsonObject2);
                } catch (TableNotFoundException e) {
                    JsonObject jsonObject3 = new JsonObject();
                    jsonObject3.addProperty("handle", QueryHandle.NO_OP.getHandle());
                    httpResponder.sendJson(HttpResponseStatus.OK, jsonObject3);
                }
            } catch (Exception e2) {
                String isClassNotFoundException = isClassNotFoundException(e2);
                if (isClassNotFoundException == null) {
                    throw e2;
                }
                LOG.info("Cannot load dataset {} because class {} cannot be found. This is probably because class {} is a type parameter of dataset {} that is not present in the dataset's jar file. See the developer guide for more information.", new Object[]{str, isClassNotFoundException, isClassNotFoundException, str});
                JsonObject jsonObject4 = new JsonObject();
                jsonObject4.addProperty("handle", QueryHandle.NO_OP.getHandle());
                httpResponder.sendJson(HttpResponseStatus.OK, jsonObject4);
            }
        } catch (Throwable th) {
            LOG.error("Got exception:", th);
            httpResponder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, th.getMessage());
        }
    }

    /* JADX WARN: Type inference failed for: r2v4, types: [co.cask.cdap.explore.executor.ExploreExecutorHttpHandler$1] */
    @Path("/datasets/{dataset}/partitions/{time}")
    @PUT
    public void addPartition(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("dataset") String str, @PathParam("time") long j) {
        try {
            String str2 = (String) ((Map) GSON.fromJson(new InputStreamReader(new ChannelBufferInputStream(httpRequest.getContent())), new TypeToken<Map<String, String>>() { // from class: co.cask.cdap.explore.executor.ExploreExecutorHttpHandler.1
            }.getType())).get("path");
            if (str2 == null) {
                httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, "path was not specified.");
            }
            String generateAddPartitionStatement = generateAddPartitionStatement(str, j, str2);
            LOG.debug("Add partition for time {} dataset {} - {}", new Object[]{Long.valueOf(j), str, generateAddPartitionStatement});
            QueryHandle execute = this.exploreService.execute(generateAddPartitionStatement);
            JsonObject jsonObject = new JsonObject();
            jsonObject.addProperty("handle", execute.getHandle());
            httpResponder.sendJson(HttpResponseStatus.OK, jsonObject);
        } catch (Throwable th) {
            LOG.error("Got exception:", th);
            httpResponder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, th.getMessage());
        }
    }

    @Path("/datasets/{dataset}/partitions/{time}")
    @DELETE
    public void dropPartition(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("dataset") String str, @PathParam("time") long j) {
        try {
            String generateDropPartitionStatement = generateDropPartitionStatement(str, j);
            LOG.debug("Drop partition for time {} dataset {} - {}", new Object[]{Long.valueOf(j), str, generateDropPartitionStatement});
            QueryHandle execute = this.exploreService.execute(generateDropPartitionStatement);
            JsonObject jsonObject = new JsonObject();
            jsonObject.addProperty("handle", execute.getHandle());
            httpResponder.sendJson(HttpResponseStatus.OK, jsonObject);
        } catch (Throwable th) {
            LOG.error("Got exception:", th);
            httpResponder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, th.getMessage());
        }
    }

    private static String getStreamTableName(String str) {
        return getHiveTableName("cdap_stream_" + str);
    }

    public static String getHiveTableName(String str) {
        return str.replaceAll("\\.", "_").toLowerCase();
    }

    public static String generateStreamCreateStatement(String str, String str2, Schema schema) throws UnsupportedTypeException {
        ArrayList newArrayList = Lists.newArrayList(new Schema.Field[]{Schema.Field.of("ts", Schema.of(Schema.Type.LONG)), Schema.Field.of("headers", Schema.mapOf(Schema.of(Schema.Type.STRING), Schema.of(Schema.Type.STRING)))});
        newArrayList.addAll(schema.getFields());
        return String.format("CREATE EXTERNAL TABLE IF NOT EXISTS %s %s COMMENT \"CDAP Stream\" STORED BY \"%s\" WITH SERDEPROPERTIES(\"%s\" = \"%s\") LOCATION \"%s\"TBLPROPERTIES ('%s'='%s')", getStreamTableName(str), SchemaConverter.toHiveSchema(Schema.recordOf("streamEvent", newArrayList)), "co.cask.cdap.hive.stream.StreamStorageHandler", "explore.stream.name", str, str2, "cdap.name", str);
    }

    public static String generateCreateStatement(String str, Dataset dataset) throws UnsupportedTypeException {
        return String.format("CREATE EXTERNAL TABLE IF NOT EXISTS %s %s COMMENT \"CDAP Dataset\" STORED BY \"%s\" WITH SERDEPROPERTIES(\"%s\" = \"%s\")TBLPROPERTIES ('%s'='%s')", getHiveTableName(str), hiveSchemaFor(dataset), "co.cask.cdap.hive.datasets.DatasetStorageHandler", "explore.dataset.name", str, "cdap.name", str);
    }

    public static String generateFileSetCreateStatement(String str, Dataset dataset, Map<String, String> map) throws IllegalArgumentException {
        Object obj;
        Location baseLocation;
        String hiveTableName = getHiveTableName(str);
        String serDe = FileSetProperties.getSerDe(map);
        String exploreInputFormat = FileSetProperties.getExploreInputFormat(map);
        String exploreOutputFormat = FileSetProperties.getExploreOutputFormat(map);
        Preconditions.checkArgument((serDe == null || exploreInputFormat == null || exploreOutputFormat == null) ? false : true, "All of SerDe, InputFormat and OutputFormat must be given in dataset properties");
        if (dataset instanceof TimePartitionedFileSet) {
            obj = "PARTITIONED BY (year INT, month INT, day INT, hour INT, minute INT)";
            baseLocation = ((TimePartitionedFileSet) dataset).getUnderlyingFileSet().getBaseLocation();
        } else {
            obj = "";
            baseLocation = ((FileSet) dataset).getBaseLocation();
        }
        String str2 = "";
        Map tableProperties = FileSetProperties.getTableProperties(map);
        tableProperties.put("cdap.name", str);
        if (!tableProperties.isEmpty()) {
            StringBuilder sb = new StringBuilder("TBLPROPERTIES (");
            Joiner.on(", ").appendTo(sb, Iterables.transform(tableProperties.entrySet(), new Function<Map.Entry<String, String>, String>() { // from class: co.cask.cdap.explore.executor.ExploreExecutorHttpHandler.2
                public String apply(Map.Entry<String, String> entry) {
                    return String.format("'%s'='%s'", entry.getKey(), entry.getValue().replaceAll("'", "\\'"));
                }
            }));
            sb.append(")");
            str2 = sb.toString();
        }
        return String.format("CREATE EXTERNAL TABLE IF NOT EXISTS %s %s ROW FORMAT SERDE '%s' STORED AS INPUTFORMAT '%s' OUTPUTFORMAT '%s' LOCATION '%s' %s", hiveTableName, obj, serDe, exploreInputFormat, exploreOutputFormat, baseLocation.toURI().toString(), str2);
    }

    public static String generateDeleteStatement(String str) {
        return String.format("DROP TABLE IF EXISTS %s", getHiveTableName(str));
    }

    public static String generateAddPartitionStatement(String str, long j, String str2) {
        Calendar calendar = Calendar.getInstance();
        calendar.setTimeInMillis(j);
        return String.format("ALTER TABLE %s ADD PARTITION(year=%d,month=%d,day=%d,hour=%d,minute=%d) LOCATION '%s'", getHiveTableName(str), Integer.valueOf(calendar.get(1)), Integer.valueOf(calendar.get(2) + 1), Integer.valueOf(calendar.get(5)), Integer.valueOf(calendar.get(11)), Integer.valueOf(calendar.get(12)), str2);
    }

    public static String generateDropPartitionStatement(String str, long j) {
        Calendar calendar = Calendar.getInstance();
        calendar.setTimeInMillis(j);
        return String.format("ALTER TABLE %s DROP PARTITION(year=%d,month=%d,day=%d,hour=%d,minute=%d)", getHiveTableName(str), Integer.valueOf(calendar.get(1)), Integer.valueOf(calendar.get(2) + 1), Integer.valueOf(calendar.get(5)), Integer.valueOf(calendar.get(11)), Integer.valueOf(calendar.get(12)));
    }

    static String hiveSchemaFor(Dataset dataset) throws UnsupportedTypeException {
        if (dataset instanceof RecordScannable) {
            return hiveSchemaFor(((RecordScannable) dataset).getRecordType());
        }
        if (dataset instanceof RecordWritable) {
            return hiveSchemaFor(((RecordWritable) dataset).getRecordType());
        }
        throw new UnsupportedTypeException("Dataset neither implements RecordScannable not RecordWritable.");
    }

    static String hiveSchemaFor(Type type) throws UnsupportedTypeException {
        new ReflectionSchemaGenerator().generate(type, false);
        StructObjectInspector reflectionObjectInspector = ObjectInspectorFactory.getReflectionObjectInspector(type);
        if (!(reflectionObjectInspector instanceof StructObjectInspector)) {
            throw new UnsupportedTypeException(String.format("Type must be a RECORD, but is %s", type.getClass().getName()));
        }
        StructObjectInspector structObjectInspector = reflectionObjectInspector;
        StringBuilder sb = new StringBuilder("(");
        boolean z = true;
        for (StructField structField : structObjectInspector.getAllStructFieldRefs()) {
            if (z) {
                z = false;
            } else {
                sb.append(", ");
            }
            sb.append(structField.getFieldName()).append(" ").append(structField.getFieldObjectInspector().getTypeName());
        }
        sb.append(")");
        return sb.toString();
    }
}
