package co.cask.cdap.metadata;

import co.cask.cdap.AllProgramsApp;
import co.cask.cdap.AppWithWorkflow;
import co.cask.cdap.api.app.ApplicationSpecification;
import co.cask.cdap.api.lineage.field.EndPoint;
import co.cask.cdap.api.lineage.field.InputField;
import co.cask.cdap.api.lineage.field.ReadOperation;
import co.cask.cdap.api.lineage.field.TransformOperation;
import co.cask.cdap.api.lineage.field.WriteOperation;
import co.cask.cdap.api.metadata.Metadata;
import co.cask.cdap.api.metadata.MetadataEntity;
import co.cask.cdap.api.metadata.MetadataScope;
import co.cask.cdap.api.workflow.NodeStatus;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.metadata.MetadataRecordV2;
import co.cask.cdap.common.utils.Tasks;
import co.cask.cdap.config.PreferencesService;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.metadata.lineage.AccessType;
import co.cask.cdap.data2.metadata.lineage.DefaultLineageStoreReader;
import co.cask.cdap.data2.metadata.lineage.field.DefaultFieldLineageReader;
import co.cask.cdap.data2.metadata.lineage.field.EndPointField;
import co.cask.cdap.data2.metadata.lineage.field.FieldLineageInfo;
import co.cask.cdap.data2.metadata.store.MetadataStore;
import co.cask.cdap.data2.metadata.writer.FieldLineageWriter;
import co.cask.cdap.data2.metadata.writer.LineageWriter;
import co.cask.cdap.data2.metadata.writer.MessagingLineageWriter;
import co.cask.cdap.data2.metadata.writer.MessagingMetadataPublisher;
import co.cask.cdap.data2.metadata.writer.MetadataOperation;
import co.cask.cdap.data2.metadata.writer.MetadataPublisher;
import co.cask.cdap.data2.registry.BasicUsageRegistry;
import co.cask.cdap.data2.registry.MessagingUsageWriter;
import co.cask.cdap.data2.registry.UsageWriter;
import co.cask.cdap.internal.app.ApplicationSpecificationAdapter;
import co.cask.cdap.internal.app.deploy.Specifications;
import co.cask.cdap.internal.app.runtime.batch.AppWithMapReduceUsingRuntimeDatasets;
import co.cask.cdap.internal.app.runtime.schedule.ProgramSchedule;
import co.cask.cdap.internal.app.runtime.schedule.trigger.TimeTrigger;
import co.cask.cdap.internal.app.runtime.workflow.BasicWorkflowToken;
import co.cask.cdap.internal.app.runtime.workflow.MessagingWorkflowStateWriter;
import co.cask.cdap.internal.app.runtime.workflow.WorkflowStateWriter;
import co.cask.cdap.internal.app.scheduler.LogPrintingJob;
import co.cask.cdap.internal.app.services.http.AppFabricTestBase;
import co.cask.cdap.internal.app.store.DefaultStore;
import co.cask.cdap.internal.io.ReflectionSchemaGenerator;
import co.cask.cdap.internal.profile.AdminEventPublisher;
import co.cask.cdap.internal.profile.ProfileService;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.context.MultiThreadMessagingContext;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.WorkflowNodeStateDetail;
import co.cask.cdap.proto.id.ApplicationId;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.FlowletId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProfileId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.id.ScheduleId;
import co.cask.cdap.proto.id.StreamId;
import co.cask.cdap.proto.id.WorkflowId;
import co.cask.cdap.proto.metadata.lineage.ProgramRunOperations;
import co.cask.cdap.proto.profile.Profile;
import co.cask.cdap.scheduler.ProgramScheduleService;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:co/cask/cdap/metadata/MetadataSubscriberServiceTest.class */
public class MetadataSubscriberServiceTest extends AppFabricTestBase {
    private final StreamId stream1 = NamespaceId.DEFAULT.stream("stream1");
    private final DatasetId dataset1 = NamespaceId.DEFAULT.dataset("dataset1");
    private final DatasetId dataset2 = NamespaceId.DEFAULT.dataset("dataset2");
    private final DatasetId dataset3 = NamespaceId.DEFAULT.dataset("dataset3");
    private final ProgramId flow1 = NamespaceId.DEFAULT.app("app1").program(ProgramType.FLOW, "flow1");
    private final FlowletId flowlet1 = this.flow1.flowlet("flowlet1");
    private final ProgramId spark1 = NamespaceId.DEFAULT.app("app2").program(ProgramType.SPARK, "spark1");
    private final WorkflowId workflow1 = NamespaceId.DEFAULT.app("app3").workflow("workflow1");

    @Test
    public void testSubscriber() throws InterruptedException, ExecutionException, TimeoutException {
        DatasetId dataset = NamespaceId.DEFAULT.dataset("testSubscriberLineage");
        DatasetId dataset2 = NamespaceId.DEFAULT.dataset("testSubscriberFieldLineage");
        DatasetId dataset3 = NamespaceId.DEFAULT.dataset("testSubscriberUsage");
        LineageWriter lineageWriter = (LineageWriter) getInjector().getInstance(MessagingLineageWriter.class);
        ProgramRunId run = this.flow1.run(RunIds.generate());
        lineageWriter.addAccess(run, this.dataset1, AccessType.READ);
        lineageWriter.addAccess(run, this.dataset2, AccessType.WRITE);
        DefaultLineageStoreReader defaultLineageStoreReader = new DefaultLineageStoreReader(getDatasetFramework(), getTxClient(), dataset);
        Assert.assertTrue(defaultLineageStoreReader.getEntitiesForRun(run).isEmpty());
        FieldLineageWriter fieldLineageWriter = (FieldLineageWriter) getInjector().getInstance(MessagingLineageWriter.class);
        ProgramRunId run2 = this.spark1.run(RunIds.generate(100L));
        ReadOperation readOperation = new ReadOperation("read", "some read", EndPoint.of("ns", "endpoint1"), new String[]{"offset", "body"});
        TransformOperation transformOperation = new TransformOperation("parse", "parse body", Collections.singletonList(InputField.of("read", "body")), new String[]{"name", "address"});
        WriteOperation writeOperation = new WriteOperation("write", "write data", EndPoint.of("ns", "endpoint2"), Arrays.asList(InputField.of("read", "offset"), InputField.of("parse", "name"), InputField.of("parse", "address")));
        ArrayList arrayList = new ArrayList();
        arrayList.add(readOperation);
        arrayList.add(writeOperation);
        arrayList.add(transformOperation);
        FieldLineageInfo fieldLineageInfo = new FieldLineageInfo(arrayList);
        fieldLineageWriter.write(run2, fieldLineageInfo);
        ProgramRunId run3 = this.spark1.run(RunIds.generate(200L));
        fieldLineageWriter.write(run3, fieldLineageInfo);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(readOperation);
        arrayList2.add(transformOperation);
        arrayList2.add(new TransformOperation("normalize", "normalize address", Collections.singletonList(InputField.of("parse", "address")), new String[]{"address"}));
        WriteOperation writeOperation2 = new WriteOperation("anotherwrite", "write data", EndPoint.of("ns", "endpoint2"), Arrays.asList(InputField.of("read", "offset"), InputField.of("parse", "name"), InputField.of("normalize", "address")));
        arrayList2.add(writeOperation2);
        FieldLineageInfo fieldLineageInfo2 = new FieldLineageInfo(arrayList2);
        ProgramRunId run4 = this.spark1.run(RunIds.generate(300L));
        fieldLineageWriter.write(run4, fieldLineageInfo2);
        UsageWriter usageWriter = (UsageWriter) getInjector().getInstance(MessagingUsageWriter.class);
        usageWriter.register(this.spark1, this.dataset1);
        usageWriter.registerAll(Collections.singleton(this.spark1), this.dataset3);
        BasicUsageRegistry basicUsageRegistry = new BasicUsageRegistry(getDatasetFramework(), getTxClient(), dataset3);
        Assert.assertTrue(basicUsageRegistry.getDatasets(this.spark1).isEmpty());
        MetadataSubscriberService metadataSubscriberService = (MetadataSubscriberService) getInjector().getInstance(MetadataSubscriberService.class);
        metadataSubscriberService.setLineageDatasetId(dataset).setFieldLineageDatasetId(dataset2).setUsageDatasetId(dataset3).startAndWait();
        try {
            HashSet hashSet = new HashSet(Arrays.asList(run.getParent(), this.dataset1, this.dataset2));
            Tasks.waitFor(true, () -> {
                return Boolean.valueOf(hashSet.equals(defaultLineageStoreReader.getEntitiesForRun(run)));
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            lineageWriter.addAccess(run, this.stream1, AccessType.UNKNOWN, this.flowlet1);
            hashSet.add(this.stream1);
            Tasks.waitFor(true, () -> {
                return Boolean.valueOf(hashSet.equals(defaultLineageStoreReader.getEntitiesForRun(run)));
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            Assert.assertTrue(defaultLineageStoreReader.getRelations(this.spark1, 0L, Long.MAX_VALUE, relation -> {
                return true;
            }).isEmpty());
            DefaultFieldLineageReader defaultFieldLineageReader = new DefaultFieldLineageReader(getDatasetFramework(), getTxClient(), dataset2);
            HashSet hashSet2 = new HashSet();
            hashSet2.add(readOperation);
            hashSet2.add(writeOperation2);
            ArrayList arrayList3 = new ArrayList();
            arrayList3.add(new ProgramRunOperations(Collections.singleton(run4), hashSet2));
            HashSet hashSet3 = new HashSet();
            hashSet3.add(readOperation);
            hashSet3.add(writeOperation);
            arrayList3.add(new ProgramRunOperations(new HashSet(Arrays.asList(run2, run3)), hashSet3));
            EndPointField endPointField = new EndPointField(EndPoint.of("ns", "endpoint2"), "offset");
            Tasks.waitFor(arrayList3, () -> {
                return defaultFieldLineageReader.getIncomingOperations(endPointField, 1L, 9223372036854775806L);
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            HashSet hashSet4 = new HashSet(Arrays.asList(this.dataset1, this.dataset3));
            Tasks.waitFor(true, () -> {
                return Boolean.valueOf(hashSet4.equals(basicUsageRegistry.getDatasets(this.spark1)));
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            usageWriter.register(this.flow1, this.stream1);
            hashSet4.clear();
            hashSet4.add(this.stream1);
            Tasks.waitFor(true, () -> {
                return Boolean.valueOf(hashSet4.equals(basicUsageRegistry.getStreams(this.flow1)));
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            metadataSubscriberService.stopAndWait();
        } catch (Throwable th) {
            metadataSubscriberService.stopAndWait();
            throw th;
        }
    }

    @Test
    public void testWorkflow() throws InterruptedException, ExecutionException, TimeoutException {
        ProgramRunId run = this.workflow1.run(RunIds.generate());
        BasicWorkflowToken basicWorkflowToken = new BasicWorkflowToken(1024);
        basicWorkflowToken.setCurrentNode("node1");
        basicWorkflowToken.put(LogPrintingJob.KEY, LogPrintingJob.VALUE);
        WorkflowStateWriter workflowStateWriter = (WorkflowStateWriter) getInjector().getInstance(MessagingWorkflowStateWriter.class);
        workflowStateWriter.setWorkflowToken(run, basicWorkflowToken);
        workflowStateWriter.addWorkflowNodeState(run, new WorkflowNodeStateDetail("action1", NodeStatus.RUNNING));
        Store store = (Store) getInjector().getInstance(DefaultStore.class);
        Assert.assertNull(store.getWorkflowToken(this.workflow1, run.getRun()).get(LogPrintingJob.KEY));
        MetadataSubscriberService metadataSubscriberService = (MetadataSubscriberService) getInjector().getInstance(MetadataSubscriberService.class);
        metadataSubscriberService.startAndWait();
        try {
            Tasks.waitFor(LogPrintingJob.VALUE, () -> {
                return (String) Optional.ofNullable(store.getWorkflowToken(this.workflow1, run.getRun()).get(LogPrintingJob.KEY)).map((v0) -> {
                    return v0.toString();
                }).orElse(null);
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            Tasks.waitFor(NodeStatus.RUNNING, () -> {
                return (NodeStatus) store.getWorkflowNodeStates(run).stream().findFirst().map((v0) -> {
                    return v0.getNodeStatus();
                }).orElse(null);
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            workflowStateWriter.addWorkflowNodeState(run, new WorkflowNodeStateDetail("action1", NodeStatus.COMPLETED));
            Tasks.waitFor(NodeStatus.COMPLETED, () -> {
                return (NodeStatus) store.getWorkflowNodeStates(run).stream().findFirst().map((v0) -> {
                    return v0.getNodeStatus();
                }).orElse(null);
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            metadataSubscriberService.stopAndWait();
        } catch (Throwable th) {
            metadataSubscriberService.stopAndWait();
            throw th;
        }
    }

    @Test
    public void testMetadata() throws InterruptedException, TimeoutException, ExecutionException {
        ProgramRunId run = this.workflow1.run(RunIds.generate());
        MetadataEntity ofDataset = MetadataEntity.ofDataset("myns", "myds");
        MetadataStore metadataStore = (MetadataStore) getInjector().getInstance(MetadataStore.class);
        MetadataRecordV2 metadata = metadataStore.getMetadata(MetadataScope.USER, ofDataset);
        Assert.assertTrue(metadata.getProperties().isEmpty());
        Assert.assertTrue(metadata.getTags().isEmpty());
        MetadataPublisher metadataPublisher = (MetadataPublisher) getInjector().getInstance(MessagingMetadataPublisher.class);
        MetadataSubscriberService metadataSubscriberService = (MetadataSubscriberService) getInjector().getInstance(MetadataSubscriberService.class);
        metadataSubscriberService.startAndWait();
        try {
            ImmutableMap of = ImmutableMap.of("a", AppWithMapReduceUsingRuntimeDatasets.FileMapper.ONLY_KEY, "b", "z");
            ImmutableSet of2 = ImmutableSet.of("t1", "t2");
            metadataPublisher.publish(run, new MetadataOperation(ofDataset, MetadataOperation.Type.PUT, new Metadata(of, of2)));
            waitForMetadata(ofDataset, metadataStore, 2, 2);
            MetadataRecordV2 metadata2 = metadataStore.getMetadata(MetadataScope.USER, ofDataset);
            Assert.assertEquals(of, metadata2.getProperties());
            Assert.assertEquals(of2, metadata2.getTags());
            metadataPublisher.publish(run, new MetadataOperation(ofDataset, MetadataOperation.Type.DELETE, new Metadata(ImmutableMap.of("a", ""), ImmutableSet.of("t1", "t3"))));
            waitForMetadata(ofDataset, metadataStore, 1, 1);
            MetadataRecordV2 metadata3 = metadataStore.getMetadata(MetadataScope.USER, ofDataset);
            Assert.assertEquals(ImmutableMap.of("b", "z"), metadata3.getProperties());
            Assert.assertEquals(ImmutableSet.of("t2"), metadata3.getTags());
            metadataPublisher.publish(run, new MetadataOperation(ofDataset, MetadataOperation.Type.PUT, new Metadata(of, Collections.emptySet())));
            waitForMetadata(ofDataset, metadataStore, 2, 1);
            metadataPublisher.publish(run, new MetadataOperation(ofDataset, MetadataOperation.Type.PUT, new Metadata(Collections.emptyMap(), of2)));
            waitForMetadata(ofDataset, metadataStore, 2, 2);
            metadataPublisher.publish(run, new MetadataOperation(ofDataset, MetadataOperation.Type.DELETE_ALL_PROPERTIES, (Metadata) null));
            waitForMetadata(ofDataset, metadataStore, 0, 2);
            metadataPublisher.publish(run, new MetadataOperation(ofDataset, MetadataOperation.Type.DELETE_ALL_TAGS, (Metadata) null));
            waitForMetadata(ofDataset, metadataStore, 0, 0);
            metadataPublisher.publish(run, new MetadataOperation(ofDataset, MetadataOperation.Type.PUT, new Metadata(of, of2)));
            waitForMetadata(ofDataset, metadataStore, 2, 2);
            metadataPublisher.publish(run, new MetadataOperation(ofDataset, MetadataOperation.Type.DELETE_ALL, (Metadata) null));
            waitForMetadata(ofDataset, metadataStore, 0, 0);
            metadataSubscriberService.stopAndWait();
        } catch (Throwable th) {
            metadataSubscriberService.stopAndWait();
            throw th;
        }
    }

    @Test
    public void testProfileMetadata() throws Exception {
        Injector injector = getInjector();
        PreferencesService preferencesService = (PreferencesService) injector.getInstance(PreferencesService.class);
        preferencesService.setProperties(NamespaceId.DEFAULT, Collections.singletonMap("system.profile.name", ProfileId.NATIVE.getScopedName()));
        ApplicationSpecification from = Specifications.from(new AppWithWorkflow());
        ApplicationId app = NamespaceId.DEFAULT.app(from.getName());
        WorkflowId workflow = app.workflow("SampleWorkflow");
        ScheduleId schedule = app.schedule("tsched1");
        Store store = (Store) injector.getInstance(DefaultStore.class);
        store.addApplication(app, from);
        ProgramScheduleService programScheduleService = (ProgramScheduleService) injector.getInstance(ProgramScheduleService.class);
        programScheduleService.add(new ProgramSchedule("tsched1", "one time schedule", workflow, Collections.emptyMap(), new TimeTrigger("* * ? * 1"), ImmutableList.of()));
        MetadataStore metadataStore = (MetadataStore) injector.getInstance(MetadataStore.class);
        Assert.assertEquals(Collections.emptyMap(), metadataStore.getProperties(workflow.toMetadataEntity()));
        Assert.assertEquals(Collections.emptyMap(), metadataStore.getProperties(schedule.toMetadataEntity()));
        MetadataSubscriberService metadataSubscriberService = (MetadataSubscriberService) injector.getInstance(MetadataSubscriberService.class);
        metadataSubscriberService.startAndWait();
        ProfileService profileService = (ProfileService) injector.getInstance(ProfileService.class);
        ProfileId profileId = new ProfileId(NamespaceId.DEFAULT.getNamespace(), "MyProfile");
        profileService.saveProfile(profileId, new Profile("MyProfile", Profile.NATIVE.getLabel(), Profile.NATIVE.getDescription(), Profile.NATIVE.getScope(), Profile.NATIVE.getProvisioner()));
        ProfileId profileId2 = new ProfileId(NamespaceId.DEFAULT.getNamespace(), "MyProfile2");
        profileService.saveProfile(profileId2, new Profile("MyProfile2", Profile.NATIVE.getLabel(), Profile.NATIVE.getDescription(), Profile.NATIVE.getScope(), Profile.NATIVE.getProvisioner()));
        try {
            Tasks.waitFor(ProfileId.NATIVE.getScopedName(), () -> {
                return (String) metadataStore.getProperties(workflow.toMetadataEntity()).get("profile");
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            Tasks.waitFor(ProfileId.NATIVE.getScopedName(), () -> {
                return (String) metadataStore.getProperties(schedule.toMetadataEntity()).get("profile");
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            preferencesService.setProperties(NamespaceId.DEFAULT, Collections.singletonMap("system.profile.name", "USER:MyProfile"));
            Tasks.waitFor(profileId.getScopedName(), () -> {
                return (String) metadataStore.getProperties(workflow.toMetadataEntity()).get("profile");
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            Tasks.waitFor(profileId.getScopedName(), () -> {
                return (String) metadataStore.getProperties(schedule.toMetadataEntity()).get("profile");
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            preferencesService.setProperties(app, Collections.singletonMap("system.profile.name", "USER:MyProfile2"));
            preferencesService.setProperties(Collections.singletonMap("system.profile.name", ProfileId.NATIVE.getScopedName()));
            Tasks.waitFor(profileId2.getScopedName(), () -> {
                return (String) metadataStore.getProperties(workflow.toMetadataEntity()).get("profile");
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            Tasks.waitFor(profileId2.getScopedName(), () -> {
                return (String) metadataStore.getProperties(schedule.toMetadataEntity()).get("profile");
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            preferencesService.deleteProperties();
            Tasks.waitFor(profileId2.getScopedName(), () -> {
                return (String) metadataStore.getProperties(workflow.toMetadataEntity()).get("profile");
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            Tasks.waitFor(profileId2.getScopedName(), () -> {
                return (String) metadataStore.getProperties(schedule.toMetadataEntity()).get("profile");
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            preferencesService.deleteProperties(app);
            Tasks.waitFor(profileId.getScopedName(), () -> {
                return (String) metadataStore.getProperties(workflow.toMetadataEntity()).get("profile");
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            Tasks.waitFor(profileId.getScopedName(), () -> {
                return (String) metadataStore.getProperties(schedule.toMetadataEntity()).get("profile");
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            preferencesService.deleteProperties(NamespaceId.DEFAULT);
            Tasks.waitFor(ProfileId.NATIVE.getScopedName(), () -> {
                return (String) metadataStore.getProperties(workflow.toMetadataEntity()).get("profile");
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            Tasks.waitFor(ProfileId.NATIVE.getScopedName(), () -> {
                return (String) metadataStore.getProperties(schedule.toMetadataEntity()).get("profile");
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            metadataSubscriberService.stopAndWait();
            preferencesService.deleteProperties(NamespaceId.DEFAULT);
            preferencesService.deleteProperties();
            preferencesService.deleteProperties(app);
            store.removeAllApplications(NamespaceId.DEFAULT);
            programScheduleService.delete(schedule);
            profileService.disableProfile(profileId);
            profileService.disableProfile(profileId2);
            profileService.deleteAllProfiles(profileId.getNamespaceId());
            metadataStore.removeMetadata(workflow.toMetadataEntity());
            metadataStore.removeMetadata(schedule.toMetadataEntity());
        } catch (Throwable th) {
            metadataSubscriberService.stopAndWait();
            preferencesService.deleteProperties(NamespaceId.DEFAULT);
            preferencesService.deleteProperties();
            preferencesService.deleteProperties(app);
            store.removeAllApplications(NamespaceId.DEFAULT);
            programScheduleService.delete(schedule);
            profileService.disableProfile(profileId);
            profileService.disableProfile(profileId2);
            profileService.deleteAllProfiles(profileId.getNamespaceId());
            metadataStore.removeMetadata(workflow.toMetadataEntity());
            metadataStore.removeMetadata(schedule.toMetadataEntity());
            throw th;
        }
    }

    @Test
    public void testProfileMetadataWithNoProfilePreferences() throws Exception {
        Injector injector = getInjector();
        ProfileService profileService = (ProfileService) injector.getInstance(ProfileService.class);
        ProfileId profileId = new ProfileId(NamespaceId.DEFAULT.getNamespace(), "MyProfile");
        profileService.saveProfile(profileId, new Profile("MyProfile", Profile.NATIVE.getLabel(), Profile.NATIVE.getDescription(), Profile.NATIVE.getScope(), Profile.NATIVE.getProvisioner()));
        PreferencesService preferencesService = (PreferencesService) injector.getInstance(PreferencesService.class);
        preferencesService.setProperties(NamespaceId.DEFAULT, Collections.singletonMap("system.profile.name", "USER:MyProfile"));
        ApplicationSpecification from = Specifications.from(new AppWithWorkflow());
        ApplicationId app = NamespaceId.DEFAULT.app(from.getName());
        WorkflowId workflow = app.workflow("SampleWorkflow");
        Store store = (Store) injector.getInstance(DefaultStore.class);
        store.addApplication(app, from);
        MetadataStore metadataStore = (MetadataStore) injector.getInstance(MetadataStore.class);
        Assert.assertEquals(Collections.emptyMap(), metadataStore.getProperties(workflow.toMetadataEntity()));
        MetadataSubscriberService metadataSubscriberService = (MetadataSubscriberService) getInjector().getInstance(MetadataSubscriberService.class);
        metadataSubscriberService.startAndWait();
        try {
            Tasks.waitFor(profileId.getScopedName(), () -> {
                return (String) metadataStore.getProperties(workflow.toMetadataEntity()).get("profile");
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            preferencesService.setProperties(NamespaceId.DEFAULT, Collections.emptyMap());
            Tasks.waitFor(ProfileId.NATIVE.getScopedName(), () -> {
                return (String) metadataStore.getProperties(workflow.toMetadataEntity()).get("profile");
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            metadataSubscriberService.stopAndWait();
            preferencesService.deleteProperties(NamespaceId.DEFAULT);
            store.removeAllApplications(NamespaceId.DEFAULT);
            profileService.disableProfile(profileId);
            profileService.deleteProfile(profileId);
            metadataStore.removeMetadata(workflow.toMetadataEntity());
        } catch (Throwable th) {
            metadataSubscriberService.stopAndWait();
            preferencesService.deleteProperties(NamespaceId.DEFAULT);
            store.removeAllApplications(NamespaceId.DEFAULT);
            profileService.disableProfile(profileId);
            profileService.deleteProfile(profileId);
            metadataStore.removeMetadata(workflow.toMetadataEntity());
            throw th;
        }
    }

    private void waitForMetadata(MetadataEntity metadataEntity, MetadataStore metadataStore, int i, int i2) throws TimeoutException, InterruptedException, ExecutionException {
        Tasks.waitFor(true, () -> {
            MetadataRecordV2 metadata = metadataStore.getMetadata(MetadataScope.USER, metadataEntity);
            return Boolean.valueOf(metadata.getProperties().size() == i && metadata.getTags().size() == i2);
        }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testAppDeletionMessage() throws Exception {
        Injector injector = getInjector();
        AdminEventPublisher adminEventPublisher = new AdminEventPublisher((CConfiguration) injector.getInstance(CConfiguration.class), new MultiThreadMessagingContext((MessagingService) injector.getInstance(MessagingService.class)));
        MetadataStore metadataStore = (MetadataStore) injector.getInstance(MetadataStore.class);
        ApplicationId app = NamespaceId.DEFAULT.app("App");
        WorkflowId workflow = app.workflow("NoOpWorkflow");
        ApplicationSpecification from = Specifications.from(new AllProgramsApp());
        ApplicationSpecificationAdapter create = ApplicationSpecificationAdapter.create(new ReflectionSchemaGenerator());
        ApplicationSpecification fromJson = create.fromJson(create.toJson(from));
        metadataStore.setProperties(MetadataScope.SYSTEM, workflow.toMetadataEntity(), Collections.singletonMap("profile", ProfileId.NATIVE.getScopedName()));
        Assert.assertEquals(ProfileId.NATIVE.getScopedName(), metadataStore.getProperties(workflow.toMetadataEntity()).get("profile"));
        MetadataSubscriberService metadataSubscriberService = (MetadataSubscriberService) injector.getInstance(MetadataSubscriberService.class);
        metadataSubscriberService.startAndWait();
        try {
            adminEventPublisher.publishAppDeletion(app, fromJson);
            Tasks.waitFor(Collections.emptyMap(), () -> {
                return metadataStore.getProperties(workflow.toMetadataEntity());
            }, 10L, TimeUnit.SECONDS, 100L, TimeUnit.MILLISECONDS);
            metadataSubscriberService.stopAndWait();
        } catch (Throwable th) {
            metadataSubscriberService.stopAndWait();
            throw th;
        }
    }

    private DatasetFramework getDatasetFramework() {
        return (DatasetFramework) getInjector().getInstance(DatasetFramework.class);
    }
}
