package co.cask.cdap.data2.metadata.lineage.field;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.dataset.Dataset;
import co.cask.cdap.api.dataset.DatasetManagementException;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.lib.AbstractDataset;
import co.cask.cdap.api.dataset.table.Put;
import co.cask.cdap.api.dataset.table.Row;
import co.cask.cdap.api.dataset.table.Scanner;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.lineage.field.EndPoint;
import co.cask.cdap.api.lineage.field.Operation;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.data2.datafabric.dataset.DatasetsUtil;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.lib.table.MDSKey;
import co.cask.cdap.data2.dataset2.lib.table.leveldb.KeyValue;
import co.cask.cdap.proto.codec.OperationTypeAdapter;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.metadata.lineage.ProgramRunOperations;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonSyntaxException;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/metadata/lineage/field/FieldLineageDataset.class */
public class FieldLineageDataset extends AbstractDataset {
    public static final DatasetId FIELD_LINEAGE_DATASET_ID = NamespaceId.SYSTEM.dataset("fieldlineage");
    private static final Logger LOG = LoggerFactory.getLogger(FieldLineageDataset.class);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Operation.class, new OperationTypeAdapter()).create();
    private static final byte[] CHECKSUM_MARKER = {99};
    private static final byte[] FIELD_MARKER = {102};
    private static final byte[] INCOMING_DIRECTION_MARKER = {105};
    private static final byte[] OUTGOING_DIRECTION_MARKER = {111};
    private static final byte[] RAW_OPERATION_MARKER = {114};
    private static final byte[] PROGRAM_MARKER = {112};
    private static final Type SET_FIELD_TYPE = new TypeToken<HashSet<String>>() { // from class: co.cask.cdap.data2.metadata.lineage.field.FieldLineageDataset.1
    }.getType();
    private static final Type SET_ENDPOINT_FIELD_TYPE = new TypeToken<HashSet<EndPointField>>() { // from class: co.cask.cdap.data2.metadata.lineage.field.FieldLineageDataset.2
    }.getType();
    private static final Type SET_OPERATION_TYPE = new TypeToken<HashSet<Operation>>() { // from class: co.cask.cdap.data2.metadata.lineage.field.FieldLineageDataset.3
    }.getType();
    private final Table table;

    public FieldLineageDataset(String str, Table table) {
        super(str, table, new Dataset[0]);
        this.table = table;
    }

    public static void setupDatasets(DatasetFramework datasetFramework) throws IOException, DatasetManagementException {
        datasetFramework.addInstance(FieldLineageDataset.class.getName(), FIELD_LINEAGE_DATASET_ID, DatasetProperties.EMPTY);
    }

    @VisibleForTesting
    public static FieldLineageDataset getFieldLineageDataset(DatasetContext datasetContext, DatasetFramework datasetFramework, DatasetId datasetId) {
        try {
            return DatasetsUtil.getOrCreateDataset(datasetContext, datasetFramework, datasetId, FieldLineageDataset.class.getName(), DatasetProperties.EMPTY);
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    public void addFieldLineageInfo(ProgramRunId programRunId, FieldLineageInfo fieldLineageInfo) {
        byte[] checksumRowKey = getChecksumRowKey(fieldLineageInfo.getChecksum());
        if (this.table.get(checksumRowKey).isEmpty()) {
            Put put = new Put(checksumRowKey);
            put.add(RAW_OPERATION_MARKER, GSON.toJson(fieldLineageInfo.getOperations()));
            for (Map.Entry<EndPoint, Set<String>> entry : fieldLineageInfo.getDestinationFields().entrySet()) {
                put.add(getFieldColumnKey(entry.getKey()), GSON.toJson(entry.getValue()));
            }
            addSummary(put, INCOMING_DIRECTION_MARKER, fieldLineageInfo.getIncomingSummary());
            addSummary(put, OUTGOING_DIRECTION_MARKER, fieldLineageInfo.getOutgoingSummary());
            this.table.put(put);
        }
        addFieldLineageInfoReferenceRecords(programRunId, fieldLineageInfo);
    }

    private void addSummary(Put put, byte[] bArr, Map<EndPointField, Set<EndPointField>> map) {
        for (Map.Entry<EndPointField, Set<EndPointField>> entry : map.entrySet()) {
            put.add(getSummaryColumnKey(bArr, entry.getKey()), GSON.toJson(entry.getValue()));
        }
    }

    private void addFieldLineageInfoReferenceRecords(ProgramRunId programRunId, FieldLineageInfo fieldLineageInfo) {
        Iterator<EndPoint> it = fieldLineageInfo.getDestinations().iterator();
        while (it.hasNext()) {
            addOperationReferenceRecord(INCOMING_DIRECTION_MARKER, it.next(), programRunId, fieldLineageInfo.getChecksum());
        }
        Iterator<EndPoint> it2 = fieldLineageInfo.getSources().iterator();
        while (it2.hasNext()) {
            addOperationReferenceRecord(OUTGOING_DIRECTION_MARKER, it2.next(), programRunId, fieldLineageInfo.getChecksum());
        }
    }

    private void addOperationReferenceRecord(byte[] bArr, EndPoint endPoint, ProgramRunId programRunId, long j) {
        Put put = new Put(getOperationReferenceRowKey(bArr, endPoint, programRunId));
        put.add(CHECKSUM_MARKER, j);
        put.add(PROGRAM_MARKER, GSON.toJson(programRunId));
        this.table.put(put);
    }

    public Set<String> getFields(EndPoint endPoint, long j, long j2) {
        Set<String> destinationFields = getDestinationFields(endPoint, j, j2);
        destinationFields.addAll(getSourceFields(endPoint, j, j2));
        return destinationFields;
    }

    private Set<String> getDestinationFields(EndPoint endPoint, long j, long j2) {
        Set<Long> keySet = getChecksumsWithProgramRunsInRange(INCOMING_DIRECTION_MARKER, endPoint, j, j2).keySet();
        HashSet hashSet = new HashSet();
        byte[] fieldColumnKey = getFieldColumnKey(endPoint);
        Iterator<Long> it = keySet.iterator();
        while (it.hasNext()) {
            byte[] checksumRowKey = getChecksumRowKey(it.next().longValue());
            Set set = null;
            try {
                set = (Set) GSON.fromJson(Bytes.toString(this.table.get(checksumRowKey, fieldColumnKey)), SET_FIELD_TYPE);
            } catch (JsonSyntaxException e) {
                LOG.warn(String.format("Failed to parse json from row '%s' and column '%s'.", Bytes.toString(checksumRowKey), Bytes.toString(fieldColumnKey)));
            }
            if (set != null) {
                hashSet.addAll(set);
            }
        }
        return hashSet;
    }

    private Set<String> getSourceFields(EndPoint endPoint, long j, long j2) {
        Set<Long> keySet = getChecksumsWithProgramRunsInRange(OUTGOING_DIRECTION_MARKER, endPoint, j, j2).keySet();
        HashSet hashSet = new HashSet();
        Iterator<Long> it = keySet.iterator();
        while (it.hasNext()) {
            hashSet.addAll((Collection) this.table.get(getChecksumRowKey(it.next().longValue())).getColumns().keySet().stream().filter(bArr -> {
                return matchesEndpoint(bArr, endPoint);
            }).map(this::extractFieldName).collect(Collectors.toSet()));
        }
        return hashSet;
    }

    public Set<EndPointField> getIncomingSummary(EndPointField endPointField, long j, long j2) {
        return getSummary(INCOMING_DIRECTION_MARKER, endPointField, j, j2);
    }

    public Set<EndPointField> getOutgoingSummary(EndPointField endPointField, long j, long j2) {
        return getSummary(OUTGOING_DIRECTION_MARKER, endPointField, j, j2);
    }

    private Set<EndPointField> getSummary(byte[] bArr, EndPointField endPointField, long j, long j2) {
        Set<Long> keySet = getChecksumsWithProgramRunsInRange(bArr, endPointField.getEndPoint(), j, j2).keySet();
        HashSet hashSet = new HashSet();
        byte[] summaryColumnKey = getSummaryColumnKey(bArr, endPointField);
        Iterator<Long> it = keySet.iterator();
        while (it.hasNext()) {
            byte[] checksumRowKey = getChecksumRowKey(it.next().longValue());
            try {
                Set set = (Set) GSON.fromJson(Bytes.toString(this.table.get(checksumRowKey, summaryColumnKey)), SET_ENDPOINT_FIELD_TYPE);
                if (set != null) {
                    hashSet.addAll(set);
                }
            } catch (JsonSyntaxException e) {
                LOG.warn(String.format("Failed to parse json from row '%s' and column '%s'. Ignoring EndPoint.", Bytes.toString(checksumRowKey), Bytes.toString(summaryColumnKey)));
            }
        }
        return hashSet;
    }

    public Set<ProgramRunOperations> getIncomingOperations(EndPoint endPoint, long j, long j2) {
        return getOperations(INCOMING_DIRECTION_MARKER, endPoint, j, j2);
    }

    public Set<ProgramRunOperations> getOutgoingOperations(EndPoint endPoint, long j, long j2) {
        return getOperations(OUTGOING_DIRECTION_MARKER, endPoint, j, j2);
    }

    private Set<ProgramRunOperations> getOperations(byte[] bArr, EndPoint endPoint, long j, long j2) {
        Map<Long, Set<ProgramRunId>> checksumsWithProgramRunsInRange = getChecksumsWithProgramRunsInRange(bArr, endPoint, j, j2);
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        for (Map.Entry<Long, Set<ProgramRunId>> entry : checksumsWithProgramRunsInRange.entrySet()) {
            byte[] checksumRowKey = getChecksumRowKey(entry.getKey().longValue());
            try {
                Set set = (Set) GSON.fromJson(Bytes.toString(this.table.get(checksumRowKey, RAW_OPERATION_MARKER)), SET_OPERATION_TYPE);
                if (set != null) {
                    linkedHashSet.add(new ProgramRunOperations(entry.getValue(), set));
                }
            } catch (JsonSyntaxException e) {
                LOG.warn(String.format("Failed to parse json from row '%s' and column '%s'. Ignoring operations.", Bytes.toString(checksumRowKey), Bytes.toString(RAW_OPERATION_MARKER)));
            }
        }
        return linkedHashSet;
    }

    private Map<Long, Set<ProgramRunId>> getChecksumsWithProgramRunsInRange(byte[] bArr, EndPoint endPoint, long j, long j2) {
        byte[] scanKey = getScanKey(bArr, endPoint, j2);
        byte[] scanKey2 = getScanKey(bArr, endPoint, j);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        Scanner scan = this.table.scan(scanKey, scanKey2);
        Throwable th = null;
        while (true) {
            try {
                try {
                    Row next = scan.next();
                    if (next == null) {
                        break;
                    }
                    if (LOG.isTraceEnabled()) {
                        LOG.trace(String.format("Got row key = '%s' for direction '%s', EndPoint '%s', and time range ('%s'-'%s').", Bytes.toString(next.getRow()), Bytes.toString(bArr), endPoint, Long.valueOf(j), Long.valueOf(j2)));
                    }
                    try {
                        ((Set) linkedHashMap.computeIfAbsent(Long.valueOf(Bytes.toLong(next.get(CHECKSUM_MARKER))), l -> {
                            return new HashSet();
                        })).add((ProgramRunId) GSON.fromJson(Bytes.toString(next.get(PROGRAM_MARKER)), ProgramRunId.class));
                    } catch (JsonSyntaxException e) {
                        LOG.warn(String.format("Failed to parse json from row '%s' and column '%s'. Ignoring the program run.", Bytes.toString(next.getRow()), Bytes.toString(PROGRAM_MARKER)));
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (scan != null) {
                    if (th != null) {
                        try {
                            scan.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        scan.close();
                    }
                }
                throw th2;
            }
        }
        if (scan != null) {
            if (0 != 0) {
                try {
                    scan.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                scan.close();
            }
        }
        return linkedHashMap;
    }

    private byte[] getScanKey(byte[] bArr, EndPoint endPoint, long j) {
        MDSKey.Builder builder = new MDSKey.Builder();
        builder.add(bArr);
        addEndPoint(builder, endPoint);
        builder.add(invertTime(j) + 1);
        return builder.build().getKey();
    }

    private byte[] getChecksumRowKey(long j) {
        MDSKey.Builder builder = new MDSKey.Builder();
        builder.add(CHECKSUM_MARKER);
        builder.add(j);
        return builder.build().getKey();
    }

    private byte[] getOperationReferenceRowKey(byte[] bArr, EndPoint endPoint, ProgramRunId programRunId) {
        long invertedStartTime = getInvertedStartTime(programRunId);
        MDSKey.Builder builder = new MDSKey.Builder();
        builder.add(bArr);
        addEndPoint(builder, endPoint);
        builder.add(invertedStartTime);
        return builder.build().getKey();
    }

    private byte[] getFieldColumnKey(EndPoint endPoint) {
        MDSKey.Builder builder = new MDSKey.Builder();
        builder.add(FIELD_MARKER);
        addEndPoint(builder, endPoint);
        return builder.build().getKey();
    }

    private long invertTime(long j) {
        return KeyValue.LATEST_TIMESTAMP - j;
    }

    private long getInvertedStartTime(ProgramRunId programRunId) {
        return invertTime(RunIds.getTime(RunIds.fromString(programRunId.getEntityName()), TimeUnit.MILLISECONDS));
    }

    private void addEndPoint(MDSKey.Builder builder, EndPoint endPoint) {
        builder.add(endPoint.getNamespace()).add(endPoint.getName());
    }

    private byte[] getSummaryColumnKey(byte[] bArr, EndPointField endPointField) {
        MDSKey.Builder builder = new MDSKey.Builder();
        builder.add(bArr);
        addEndPoint(builder, endPointField.getEndPoint());
        builder.add(endPointField.getField());
        return builder.build().getKey();
    }

    private boolean matchesEndpoint(byte[] bArr, EndPoint endPoint) {
        if (Arrays.equals(bArr, RAW_OPERATION_MARKER)) {
            return false;
        }
        MDSKey.Splitter split = new MDSKey(bArr).split();
        return Arrays.equals(OUTGOING_DIRECTION_MARKER, split.getBytes()) && split.getString().equals(endPoint.getNamespace()) && split.getString().equals(endPoint.getName());
    }

    private String extractFieldName(byte[] bArr) {
        MDSKey.Splitter split = new MDSKey(bArr).split();
        split.skipBytes();
        split.skipString();
        split.skipString();
        return split.getString();
    }
}
