package co.cask.cdap.data2.metadata.lineage.field;

import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.lineage.field.EndPoint;
import co.cask.cdap.api.lineage.field.InputField;
import co.cask.cdap.api.lineage.field.Operation;
import co.cask.cdap.api.lineage.field.ReadOperation;
import co.cask.cdap.api.lineage.field.TransformOperation;
import co.cask.cdap.api.lineage.field.WriteOperation;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.data2.datafabric.dataset.DatasetsUtil;
import co.cask.cdap.data2.dataset2.DatasetFrameworkTestUtil;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.metadata.lineage.ProgramRunOperations;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.tephra.TransactionAware;
import org.apache.tephra.TransactionExecutor;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:co/cask/cdap/data2/metadata/lineage/field/FieldLineageDatasetTest.class */
public class FieldLineageDatasetTest {

    @ClassRule
    public static DatasetFrameworkTestUtil dsFrameworkUtil = new DatasetFrameworkTestUtil();

    @Test
    public void testSimpleOperations() throws Exception {
        TransactionAware fieldLineageDataset = getFieldLineageDataset("testSimpleOperations");
        Assert.assertNotNull(fieldLineageDataset);
        TransactionExecutor newInMemoryTransactionExecutor = dsFrameworkUtil.newInMemoryTransactionExecutor(fieldLineageDataset);
        ProgramRunId run = new ProgramId("default", "app1", ProgramType.WORKFLOW, "workflow1").run(RunIds.generate(10000L).getId());
        ProgramRunId run2 = new ProgramId("default", "app1", ProgramType.WORKFLOW, "workflow1").run(RunIds.generate(11000L).getId());
        FieldLineageInfo fieldLineageInfo = new FieldLineageInfo(generateOperations(false));
        FieldLineageInfo fieldLineageInfo2 = new FieldLineageInfo(generateOperations(true));
        newInMemoryTransactionExecutor.execute(() -> {
            fieldLineageDataset.addFieldLineageInfo(run, fieldLineageInfo);
        });
        newInMemoryTransactionExecutor.execute(() -> {
            fieldLineageDataset.addFieldLineageInfo(run2, fieldLineageInfo2);
        });
        ProgramRunId run3 = new ProgramId("default", "app1", ProgramType.WORKFLOW, "workflow3").run(RunIds.generate(12000L).getId());
        newInMemoryTransactionExecutor.execute(() -> {
            fieldLineageDataset.addFieldLineageInfo(run3, fieldLineageInfo2);
        });
        newInMemoryTransactionExecutor.execute(() -> {
            EndPoint of = EndPoint.of("ns1", "endpoint1");
            EndPoint of2 = EndPoint.of("myns", "another_file");
            Assert.assertEquals(Collections.EMPTY_SET, fieldLineageDataset.getFields(of, 0L, 10000L));
            Assert.assertEquals(Collections.EMPTY_SET, fieldLineageDataset.getFields(of2, 0L, 10000L));
            HashSet hashSet = new HashSet(Arrays.asList("offset", "name"));
            HashSet hashSet2 = new HashSet(Arrays.asList("offset", "body"));
            Assert.assertEquals(hashSet, fieldLineageDataset.getFields(of2, 0L, 10001L));
            Assert.assertEquals(hashSet2, fieldLineageDataset.getFields(of, 0L, 10001L));
            Assert.assertEquals(hashSet, fieldLineageDataset.getFields(of2, 10000L, 11000L));
            Assert.assertEquals(hashSet2, fieldLineageDataset.getFields(of, 10000L, 10001L));
            hashSet.add("file_name");
            hashSet2.add("file_name");
            Assert.assertEquals(hashSet, fieldLineageDataset.getFields(of2, 10000L, 11001L));
            Assert.assertEquals(hashSet2, fieldLineageDataset.getFields(of, 10000L, 11001L));
            Assert.assertEquals(Collections.EMPTY_SET, fieldLineageDataset.getIncomingSummary(new EndPointField(of2, "offset"), 0L, 10000L));
            Assert.assertEquals(new EndPointField(of, "offset"), fieldLineageDataset.getIncomingSummary(new EndPointField(of2, "offset"), 0L, 10001L).iterator().next());
            Assert.assertEquals(new EndPointField(of, "body"), fieldLineageDataset.getIncomingSummary(new EndPointField(of2, "name"), 0L, 10001L).iterator().next());
            Assert.assertEquals(Collections.EMPTY_SET, fieldLineageDataset.getIncomingSummary(new EndPointField(of2, "file_name"), 0L, 10001L));
            Assert.assertEquals(Collections.EMPTY_SET, fieldLineageDataset.getOutgoingSummary(new EndPointField(of2, "offset"), 0L, 10000L));
            Assert.assertEquals(new EndPointField(of2, "offset"), fieldLineageDataset.getOutgoingSummary(new EndPointField(of, "offset"), 0L, 10001L).iterator().next());
            Assert.assertEquals(new EndPointField(of2, "name"), fieldLineageDataset.getOutgoingSummary(new EndPointField(of, "body"), 0L, 10001L).iterator().next());
            Assert.assertEquals(Collections.EMPTY_SET, fieldLineageDataset.getOutgoingSummary(new EndPointField(of, "file_name"), 0L, 10001L));
            Assert.assertEquals(Collections.EMPTY_SET, fieldLineageDataset.getOutgoingSummary(new EndPointField(of, "file_name"), 0L, 11000L));
            Assert.assertEquals(new EndPointField(of2, "file_name"), fieldLineageDataset.getOutgoingSummary(new EndPointField(of, "file_name"), 0L, 11001L).iterator().next());
            Set incomingOperations = fieldLineageDataset.getIncomingOperations(of2, 0L, 10001L);
            Set outgoingOperations = fieldLineageDataset.getOutgoingOperations(of, 0L, 10001L);
            Assert.assertEquals(1L, incomingOperations.size());
            Assert.assertEquals(incomingOperations, outgoingOperations);
            Assert.assertEquals(Collections.singleton(run), ((ProgramRunOperations) incomingOperations.iterator().next()).getProgramRunIds());
            Set incomingOperations2 = fieldLineageDataset.getIncomingOperations(of2, 10000L, 12001L);
            Set outgoingOperations2 = fieldLineageDataset.getOutgoingOperations(of, 10000L, 12001L);
            Assert.assertEquals(2L, incomingOperations2.size());
            Assert.assertEquals(incomingOperations2, outgoingOperations2);
            HashSet hashSet3 = new HashSet();
            hashSet3.add(new ProgramRunOperations(Collections.singleton(run), fieldLineageInfo.getOperations()));
            hashSet3.add(new ProgramRunOperations(new HashSet(Arrays.asList(run2, run3)), fieldLineageInfo2.getOperations()));
            Assert.assertEquals(hashSet3, incomingOperations2);
            Assert.assertEquals(hashSet3, outgoingOperations2);
        });
    }

    @Test
    public void testMergeSummaries() throws Exception {
        TransactionAware fieldLineageDataset = getFieldLineageDataset("testMergeDataset");
        Assert.assertNotNull(fieldLineageDataset);
        TransactionExecutor newInMemoryTransactionExecutor = dsFrameworkUtil.newInMemoryTransactionExecutor(fieldLineageDataset);
        ProgramRunId run = new ProgramId("default", "app1", ProgramType.WORKFLOW, "workflow1").run(RunIds.generate(10000L).getId());
        ProgramRunId run2 = new ProgramId("default", "app1", ProgramType.WORKFLOW, "workflow1").run(RunIds.generate(11000L).getId());
        ArrayList arrayList = new ArrayList();
        ReadOperation readOperation = new ReadOperation("read", "some read", EndPoint.of("ns1", "endpoint1"), new String[]{"offset", "body"});
        WriteOperation writeOperation = new WriteOperation("write", "some write", EndPoint.of("ns", "endpoint3"), new InputField[]{InputField.of("read", "body")});
        arrayList.add(readOperation);
        arrayList.add(writeOperation);
        FieldLineageInfo fieldLineageInfo = new FieldLineageInfo(arrayList);
        ReadOperation readOperation2 = new ReadOperation("anotherRead", "another read", EndPoint.of("ns1", "endpoint2"), new String[]{"offset", "body"});
        WriteOperation writeOperation2 = new WriteOperation("anotherWrite", "another write", EndPoint.of("ns", "endpoint3"), new InputField[]{InputField.of("anotherRead", "body")});
        arrayList.add(readOperation2);
        arrayList.add(writeOperation2);
        FieldLineageInfo fieldLineageInfo2 = new FieldLineageInfo(arrayList);
        newInMemoryTransactionExecutor.execute(() -> {
            fieldLineageDataset.addFieldLineageInfo(run, fieldLineageInfo);
        });
        newInMemoryTransactionExecutor.execute(() -> {
            fieldLineageDataset.addFieldLineageInfo(run2, fieldLineageInfo2);
        });
        newInMemoryTransactionExecutor.execute(() -> {
            EndPoint of = EndPoint.of("ns1", "endpoint1");
            EndPoint of2 = EndPoint.of("ns1", "endpoint2");
            EndPoint of3 = EndPoint.of("ns", "endpoint3");
            HashSet hashSet = new HashSet();
            hashSet.add(new EndPointField(of, "body"));
            hashSet.add(new EndPointField(of2, "body"));
            Assert.assertEquals(hashSet, fieldLineageDataset.getIncomingSummary(new EndPointField(of3, "body"), 0L, 11001L));
        });
    }

    private static FieldLineageDataset getFieldLineageDataset(String str) throws Exception {
        return DatasetsUtil.getOrCreateDataset(dsFrameworkUtil.getFramework(), DatasetFrameworkTestUtil.NAMESPACE_ID.dataset(str), FieldLineageDataset.class.getName(), DatasetProperties.EMPTY, (Map) null);
    }

    private List<Operation> generateOperations(boolean z) {
        ArrayList arrayList = new ArrayList();
        arrayList.add("offset");
        arrayList.add("body");
        if (z) {
            arrayList.add("file_name");
        }
        ReadOperation readOperation = new ReadOperation("read", "some read", EndPoint.of("ns1", "endpoint1"), arrayList);
        TransformOperation transformOperation = new TransformOperation("parse", "parsing body", Collections.singletonList(InputField.of("read", "body")), new String[]{"first_name", "last_name"});
        TransformOperation transformOperation2 = new TransformOperation("concat", "concatinating the fields", Arrays.asList(InputField.of("parse", "first_name"), InputField.of("parse", "last_name")), new String[]{"name"});
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(InputField.of("read", "offset"));
        arrayList2.add(InputField.of("concat", "name"));
        if (z) {
            arrayList2.add(InputField.of("read", "file_name"));
        }
        WriteOperation writeOperation = new WriteOperation("write_op", "writing data to file", EndPoint.of("myns", "another_file"), arrayList2);
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(transformOperation);
        arrayList3.add(transformOperation2);
        arrayList3.add(readOperation);
        arrayList3.add(writeOperation);
        return arrayList3;
    }
}
