package co.cask.cdap.metadata;

import co.cask.cdap.api.lineage.field.EndPoint;
import co.cask.cdap.common.BadRequestException;
import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.common.utils.TimeMathParser;
import co.cask.cdap.data2.metadata.lineage.LineageSerializer;
import co.cask.cdap.data2.metadata.lineage.field.EndPointField;
import co.cask.cdap.proto.codec.NamespacedEntityIdCodec;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespacedEntityId;
import co.cask.cdap.proto.id.StreamId;
import co.cask.cdap.proto.metadata.lineage.CollapseType;
import co.cask.cdap.proto.metadata.lineage.Field;
import co.cask.cdap.proto.metadata.lineage.LineageRecord;
import co.cask.http.AbstractHttpHandler;
import co.cask.http.HttpResponder;
import com.google.common.base.Joiner;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.inject.Inject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;

@Path("/v3")
/* loaded from: input_file:co/cask/cdap/metadata/LineageHTTPHandler.class */
public class LineageHTTPHandler extends AbstractHttpHandler {
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(NamespacedEntityId.class, new NamespacedEntityIdCodec()).create();
    private final LineageAdmin lineageAdmin;
    private final FieldLineageAdmin fieldLineageAdmin;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/metadata/LineageHTTPHandler$TimeRange.class */
    public static class TimeRange {
        private final long start;
        private final long end;

        private TimeRange(long j, long j2) {
            this.start = j;
            this.end = j2;
        }

        public long getStart() {
            return this.start;
        }

        public long getEnd() {
            return this.end;
        }
    }

    @Inject
    LineageHTTPHandler(LineageAdmin lineageAdmin, FieldLineageAdmin fieldLineageAdmin) {
        this.lineageAdmin = lineageAdmin;
        this.fieldLineageAdmin = fieldLineageAdmin;
    }

    @GET
    @Path("/namespaces/{namespace-id}/datasets/{dataset-id}/lineage")
    public void datasetLineage(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("dataset-id") String str2, @QueryParam("start") String str3, @QueryParam("end") String str4, @QueryParam("levels") @DefaultValue("10") int i, @QueryParam("collapse") List<String> list, @QueryParam("rollup") String str5) throws Exception {
        checkLevels(i);
        TimeRange parseRange = parseRange(str3, str4);
        httpResponder.sendJson(HttpResponseStatus.OK, GSON.toJson(LineageSerializer.toLineageRecord(TimeUnit.MILLISECONDS.toSeconds(parseRange.getStart()), TimeUnit.MILLISECONDS.toSeconds(parseRange.getEnd()), this.lineageAdmin.computeLineage(new DatasetId(str, str2), parseRange.getStart(), parseRange.getEnd(), i, str5), getCollapseTypes(list)), LineageRecord.class));
    }

    @GET
    @Path("/namespaces/{namespace-id}/datasets/{dataset-id}/lineage/fields")
    public void datasetFields(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("dataset-id") String str2, @QueryParam("start") String str3, @QueryParam("end") String str4, @QueryParam("prefix") String str5, @QueryParam("includeCurrent") boolean z) throws BadRequestException, IOException {
        TimeRange parseRange = parseRange(str3, str4);
        Set<Field> fields = this.fieldLineageAdmin.getFields(EndPoint.of(str, str2), parseRange.getStart(), parseRange.getEnd(), str5, z);
        if (z) {
            httpResponder.sendJson(HttpResponseStatus.OK, GSON.toJson(fields));
        } else {
            httpResponder.sendJson(HttpResponseStatus.OK, GSON.toJson(fields.stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.toSet())));
        }
    }

    @GET
    @Path("/namespaces/{namespace-id}/datasets/{dataset-id}/lineage/fields/{field-name}")
    public void datasetFieldLineageSummary(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("dataset-id") String str2, @PathParam("field-name") String str3, @QueryParam("direction") String str4, @QueryParam("start") String str5, @QueryParam("end") String str6) throws BadRequestException {
        TimeRange parseRange = parseRange(str5, str6);
        httpResponder.sendJson(HttpResponseStatus.OK, GSON.toJson(this.fieldLineageAdmin.getSummary(parseDirection(str4), new EndPointField(EndPoint.of(str, str2), str3), parseRange.getStart(), parseRange.getEnd())));
    }

    @GET
    @Path("/namespaces/{namespace-id}/datasets/{dataset-id}/lineage/fields/{field-name}/operations")
    public void datasetFieldLineageDetails(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("dataset-id") String str2, @PathParam("field-name") String str3, @QueryParam("direction") @DefaultValue("both") String str4, @QueryParam("start") String str5, @QueryParam("end") String str6) throws BadRequestException {
        TimeRange parseRange = parseRange(str5, str6);
        httpResponder.sendJson(HttpResponseStatus.OK, GSON.toJson(this.fieldLineageAdmin.getOperationDetails(parseDirection(str4), new EndPointField(EndPoint.of(str, str2), str3), parseRange.getStart(), parseRange.getEnd())));
    }

    @GET
    @Path("/namespaces/{namespace-id}/streams/{stream-id}/lineage")
    public void streamLineage(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("stream-id") String str2, @QueryParam("start") String str3, @QueryParam("end") String str4, @QueryParam("levels") @DefaultValue("10") int i, @QueryParam("collapse") List<String> list, @QueryParam("rollup") String str5) throws Exception {
        checkLevels(i);
        TimeRange parseRange = parseRange(str3, str4);
        httpResponder.sendJson(HttpResponseStatus.OK, GSON.toJson(LineageSerializer.toLineageRecord(TimeUnit.MILLISECONDS.toSeconds(parseRange.getStart()), TimeUnit.MILLISECONDS.toSeconds(parseRange.getEnd()), this.lineageAdmin.computeLineage(new StreamId(str, str2), parseRange.getStart(), parseRange.getEnd(), i, str5), getCollapseTypes(list)), LineageRecord.class));
    }

    private void checkLevels(int i) throws BadRequestException {
        if (i < 1) {
            throw new BadRequestException(String.format("Invalid levels (%d), should be greater than 0.", Integer.valueOf(i)));
        }
    }

    private static Set<CollapseType> getCollapseTypes(@Nullable List<String> list) throws BadRequestException {
        if (list == null) {
            return Collections.emptySet();
        }
        HashSet hashSet = new HashSet();
        for (String str : list) {
            try {
                hashSet.add(CollapseType.valueOf(str.toUpperCase()));
            } catch (IllegalArgumentException e) {
                throw new BadRequestException(String.format("Invalid collapse type %s", str));
            }
        }
        return hashSet;
    }

    private TimeRange parseRange(String str, String str2) throws BadRequestException {
        if (str == null) {
            throw new BadRequestException("Start time is required.");
        }
        if (str2 == null) {
            throw new BadRequestException("End time is required.");
        }
        long nowInSeconds = TimeMathParser.nowInSeconds();
        try {
            long millis = TimeUnit.SECONDS.toMillis(TimeMathParser.parseTimeInSeconds(nowInSeconds, str));
            long millis2 = TimeUnit.SECONDS.toMillis(TimeMathParser.parseTimeInSeconds(nowInSeconds, str2));
            if (millis < 0) {
                throw new BadRequestException(String.format("Invalid start time (%s -> %d), should be >= 0.", str, Long.valueOf(millis)));
            }
            if (millis2 < 0) {
                throw new BadRequestException(String.format("Invalid end time (%s -> %d), should be >= 0.", str2, Long.valueOf(millis2)));
            }
            if (millis > millis2) {
                throw new BadRequestException(String.format("Start time (%s -> %d) should be lesser than end time (%s -> %d).", str, Long.valueOf(millis), str2, Long.valueOf(millis2)));
            }
            return new TimeRange(millis, millis2);
        } catch (IllegalArgumentException e) {
            throw new BadRequestException(e);
        }
    }

    private Constants.FieldLineage.Direction parseDirection(String str) throws BadRequestException {
        try {
            return Constants.FieldLineage.Direction.valueOf(str.toUpperCase());
        } catch (IllegalArgumentException e) {
            throw new BadRequestException(String.format("Direction must be specified to get the field lineage summary and should be one of the following: [%s].", Joiner.on(", ").join(Constants.FieldLineage.Direction.values()).toLowerCase()));
        }
    }
}
