package co.cask.cdap.internal.app.store;

import co.cask.cdap.api.artifact.ArtifactId;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.transaction.TransactionExecutorFactory;
import co.cask.cdap.internal.AppFabricTestHelper;
import co.cask.cdap.proto.BasicThrowable;
import co.cask.cdap.proto.ProgramRunClusterStatus;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.ApplicationId;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProfileId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ProgramRunId;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.inject.Injector;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.tephra.TransactionExecutor;
import org.apache.tephra.TransactionFailureException;
import org.apache.twill.api.RunId;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:co/cask/cdap/internal/app/store/AppMetadataStoreTest.class */
public class AppMetadataStoreTest {
    private static DatasetFramework datasetFramework;
    private static CConfiguration cConf;
    private static TransactionExecutorFactory txExecutorFactory;
    private static final List<ProgramRunStatus> STOP_STATUSES = ImmutableList.of(ProgramRunStatus.COMPLETED, ProgramRunStatus.FAILED, ProgramRunStatus.KILLED);
    private static final ArtifactId ARTIFACT_ID = NamespaceId.DEFAULT.artifact("testArtifact", "1.0").toApiArtifactId();
    private static final Map<String, String> SINGLETON_PROFILE_MAP = Collections.singletonMap("system.profile.name", ProfileId.NATIVE.getScopedName());
    private final AtomicInteger sourceId = new AtomicInteger();
    private final AtomicLong runIdTime = new AtomicLong();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/internal/app/store/AppMetadataStoreTest$CountingTicker.class */
    public static class CountingTicker extends Ticker {
        private final long elementsPerMillis;
        private int numProcessed = 0;

        CountingTicker(long j) {
            this.elementsPerMillis = j;
        }

        int getNumProcessed() {
            return this.numProcessed;
        }

        public long read() {
            this.numProcessed++;
            return TimeUnit.MILLISECONDS.toNanos(this.numProcessed / this.elementsPerMillis);
        }
    }

    @BeforeClass
    public static void beforeClass() throws Exception {
        Injector injector = AppFabricTestHelper.getInjector();
        AppFabricTestHelper.ensureNamespaceExists(NamespaceId.DEFAULT);
        datasetFramework = (DatasetFramework) injector.getInstance(DatasetFramework.class);
        txExecutorFactory = (TransactionExecutorFactory) injector.getInstance(TransactionExecutorFactory.class);
        cConf = (CConfiguration) injector.getInstance(CConfiguration.class);
    }

    private void recordProvisionAndStart(ProgramRunId programRunId, AppMetadataStore appMetadataStore) {
        appMetadataStore.recordProgramProvisioning(programRunId, Collections.emptyMap(), SINGLETON_PROFILE_MAP, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()), ARTIFACT_ID);
        appMetadataStore.recordProgramProvisioned(programRunId, 0, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
        appMetadataStore.recordProgramStart(programRunId, (String) null, ImmutableMap.of(), AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
    }

    @Test
    @Ignore
    public void testOldRunRecordFormat() throws Exception {
        DatasetId dataset = NamespaceId.DEFAULT.dataset("testOldRunRecordFormat");
        datasetFramework.addInstance(Table.class.getName(), dataset, DatasetProperties.EMPTY);
        Table dataset2 = datasetFramework.getDataset(dataset, Collections.emptyMap(), (ClassLoader) null);
        Assert.assertNotNull(dataset2);
        AppMetadataStore appMetadataStore = new AppMetadataStore(dataset2, cConf);
        TransactionExecutor createExecutor = txExecutorFactory.createExecutor(Collections.singleton(appMetadataStore));
        ProgramId program = NamespaceId.DEFAULT.app("app").program(ProgramType.values()[ProgramType.values().length - 1], "program");
        RunId generate = RunIds.generate();
        ProgramRunId run = program.run(generate);
        createExecutor.execute(() -> {
            recordProvisionAndStart(run, appMetadataStore);
            appMetadataStore.recordProgramRunningOldFormat(run, RunIds.getTime(generate, TimeUnit.SECONDS), (String) null, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
        });
        createExecutor.execute(() -> {
            Set runningInRange = appMetadataStore.getRunningInRange(0L, Long.MAX_VALUE);
            Assert.assertEquals(1L, runningInRange.size());
            RunRecordMeta run2 = appMetadataStore.getRun(program.run(((RunId) runningInRange.iterator().next()).getId()));
            Assert.assertNotNull(run2);
            Assert.assertEquals(generate.getId(), run2.getPid());
        });
        createExecutor.execute(() -> {
            appMetadataStore.recordProgramStopOldFormat(run, TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()), ProgramRunStatus.COMPLETED, (BasicThrowable) null, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
            Map runs = appMetadataStore.getRuns(program, ProgramRunStatus.COMPLETED, 0L, Long.MAX_VALUE, Integer.MAX_VALUE, (Predicate) null);
            Assert.assertEquals(1L, runs.size());
            Assert.assertEquals(run, (ProgramRunId) runs.keySet().iterator().next());
        });
    }

    private AppMetadataStore getMetadataStore(String str) throws Exception {
        DatasetId dataset = NamespaceId.DEFAULT.dataset(str);
        datasetFramework.addInstance(Table.class.getName(), dataset, DatasetProperties.EMPTY);
        Table dataset2 = datasetFramework.getDataset(dataset, ImmutableMap.of(), (ClassLoader) null);
        Assert.assertNotNull(dataset2);
        return new AppMetadataStore(dataset2, cConf);
    }

    private TransactionExecutor getTxExecutor(AppMetadataStore appMetadataStore) {
        return txExecutorFactory.createExecutor(Collections.singleton(appMetadataStore));
    }

    @Test
    public void testSmallerSourceIdRecords() throws Exception {
        AppMetadataStore metadataStore = getMetadataStore("testSmallerSourceIdRecords");
        TransactionExecutor txExecutor = getTxExecutor(metadataStore);
        assertPersistedStatus(metadataStore, txExecutor, 100L, 10L, 1L, ProgramRunStatus.STARTING);
        assertPersistedStatus(metadataStore, txExecutor, 100L, 1L, 10L, ProgramRunStatus.STARTING);
        assertPersistedStatus(metadataStore, txExecutor, 1L, 100L, 10L, ProgramRunStatus.RUNNING);
        assertPersistedStatus(metadataStore, txExecutor, 1L, 10L, 100L, ProgramRunStatus.KILLED);
    }

    @Test
    public void testPendingToCompletedIsIgnored() throws Exception {
        AppMetadataStore metadataStore = getMetadataStore("testPendingToCompletedIgnored");
        TransactionExecutor txExecutor = getTxExecutor(metadataStore);
        ProgramRunId run = NamespaceId.DEFAULT.app("app").program(ProgramType.WORKFLOW, "program").run(RunIds.generate());
        txExecutor.execute(() -> {
            metadataStore.recordProgramProvisioning(run, Collections.emptyMap(), SINGLETON_PROFILE_MAP, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()), ARTIFACT_ID);
            metadataStore.recordProgramStop(run, 0L, ProgramRunStatus.COMPLETED, (BasicThrowable) null, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
            Assert.assertEquals(ProgramRunStatus.PENDING, metadataStore.getRun(run).getStatus());
        });
    }

    @Test
    public void testProvisioningFailure() throws Exception {
        AppMetadataStore metadataStore = getMetadataStore("testProvisioningFailure");
        TransactionExecutor txExecutor = getTxExecutor(metadataStore);
        ProgramId program = NamespaceId.DEFAULT.app("app").program(ProgramType.WORKFLOW, "program");
        ProgramRunId run = program.run(RunIds.generate());
        txExecutor.execute(() -> {
            metadataStore.recordProgramProvisioning(run, Collections.emptyMap(), SINGLETON_PROFILE_MAP, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()), ARTIFACT_ID);
            metadataStore.recordProgramDeprovisioning(run, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
            RunRecordMeta run2 = metadataStore.getRun(run);
            Assert.assertEquals(ProgramRunStatus.FAILED, run2.getStatus());
            Assert.assertEquals(ProgramRunClusterStatus.DEPROVISIONING, run2.getCluster().getStatus());
        });
        ProgramRunId run2 = program.run(RunIds.generate());
        txExecutor.execute(() -> {
            metadataStore.recordProgramProvisioning(run2, Collections.emptyMap(), SINGLETON_PROFILE_MAP, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()), ARTIFACT_ID);
            metadataStore.recordProgramDeprovisioned(run2, Long.valueOf(System.currentTimeMillis()), AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
            RunRecordMeta run3 = metadataStore.getRun(run2);
            Assert.assertEquals(ProgramRunStatus.FAILED, run3.getStatus());
            Assert.assertEquals(ProgramRunClusterStatus.DEPROVISIONED, run3.getCluster().getStatus());
        });
    }

    @Test
    public void testInvalidStatusPersistence() throws Exception {
        AppMetadataStore metadataStore = getMetadataStore("testInvalidStatusPersistence");
        TransactionExecutor txExecutor = getTxExecutor(metadataStore);
        ProgramId program = NamespaceId.DEFAULT.app("app").program(ProgramType.WORKFLOW, "program");
        RunId generate = RunIds.generate(this.runIdTime.incrementAndGet());
        ProgramRunId run = program.run(generate);
        AtomicLong atomicLong = new AtomicLong();
        txExecutor.execute(() -> {
            metadataStore.recordProgramRunning(run, RunIds.getTime(generate, TimeUnit.SECONDS), (String) null, AppFabricTestHelper.createSourceId(atomicLong.incrementAndGet()));
            metadataStore.recordProgramStop(run, RunIds.getTime(generate, TimeUnit.SECONDS), ProgramRunStatus.COMPLETED, (BasicThrowable) null, AppFabricTestHelper.createSourceId(atomicLong.incrementAndGet()));
            Assert.assertNull(metadataStore.getRun(run));
        });
        ProgramRunId run2 = program.run(RunIds.generate(this.runIdTime.incrementAndGet()));
        txExecutor.execute(() -> {
            metadataStore.recordProgramSuspend(run2, AppFabricTestHelper.createSourceId(atomicLong.incrementAndGet()), -1L);
            metadataStore.recordProgramResumed(run2, AppFabricTestHelper.createSourceId(atomicLong.incrementAndGet()), -1L);
            Assert.assertNull(metadataStore.getRun(run2));
        });
        RunId generate2 = RunIds.generate(this.runIdTime.incrementAndGet());
        ProgramRunId run3 = program.run(generate2);
        txExecutor.execute(() -> {
            metadataStore.recordProgramStop(run3, RunIds.getTime(generate2, TimeUnit.SECONDS), ProgramRunStatus.COMPLETED, (BasicThrowable) null, AppFabricTestHelper.createSourceId(atomicLong.incrementAndGet()));
            metadataStore.recordProgramStop(run3, RunIds.getTime(generate2, TimeUnit.SECONDS), ProgramRunStatus.KILLED, (BasicThrowable) null, AppFabricTestHelper.createSourceId(atomicLong.incrementAndGet()));
            metadataStore.recordProgramStop(run3, RunIds.getTime(generate2, TimeUnit.SECONDS), ProgramRunStatus.FAILED, (BasicThrowable) null, AppFabricTestHelper.createSourceId(atomicLong.incrementAndGet()));
            Assert.assertNull(metadataStore.getRun(run3));
        });
        RunId generate3 = RunIds.generate(this.runIdTime.incrementAndGet());
        ProgramRunId run4 = program.run(generate3);
        txExecutor.execute(() -> {
            recordProvisionAndStart(run4, metadataStore);
            metadataStore.recordProgramStop(run4, RunIds.getTime(generate3, TimeUnit.SECONDS), ProgramRunStatus.COMPLETED, (BasicThrowable) null, AppFabricTestHelper.createSourceId(atomicLong.incrementAndGet()));
            metadataStore.recordProgramStop(run4, RunIds.getTime(generate3, TimeUnit.SECONDS), ProgramRunStatus.KILLED, (BasicThrowable) null, AppFabricTestHelper.createSourceId(atomicLong.incrementAndGet()));
            Assert.assertEquals(ProgramRunStatus.COMPLETED, metadataStore.getRun(run4).getStatus());
        });
        RunId generate4 = RunIds.generate(this.runIdTime.incrementAndGet());
        ProgramRunId run5 = program.run(generate4);
        txExecutor.execute(() -> {
            recordProvisionAndStart(run5, metadataStore);
            metadataStore.recordProgramStop(run5, RunIds.getTime(generate4, TimeUnit.SECONDS), ProgramRunStatus.FAILED, (BasicThrowable) null, AppFabricTestHelper.createSourceId(atomicLong.incrementAndGet()));
            metadataStore.recordProgramStop(run5, RunIds.getTime(generate4, TimeUnit.SECONDS), ProgramRunStatus.COMPLETED, (BasicThrowable) null, AppFabricTestHelper.createSourceId(atomicLong.incrementAndGet()));
            Assert.assertEquals(ProgramRunStatus.FAILED, metadataStore.getRun(run5).getStatus());
        });
        ProgramRunId run6 = program.run(RunIds.generate(this.runIdTime.incrementAndGet()));
        Long valueOf = Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
        txExecutor.execute(() -> {
            recordProvisionAndStart(run6, metadataStore);
            metadataStore.recordProgramSuspend(run6, AppFabricTestHelper.createSourceId(atomicLong.incrementAndGet()), valueOf.longValue());
            metadataStore.recordProgramStart(run6, (String) null, Collections.emptyMap(), AppFabricTestHelper.createSourceId(atomicLong.incrementAndGet()));
            RunRecordMeta run7 = metadataStore.getRun(run6);
            Assert.assertEquals(ProgramRunStatus.SUSPENDED, run7.getStatus());
            Assert.assertEquals(valueOf, run7.getSuspendTs());
        });
        RunId generate5 = RunIds.generate(this.runIdTime.incrementAndGet());
        ProgramRunId run7 = program.run(generate5);
        txExecutor.execute(() -> {
            long time = RunIds.getTime(generate5, TimeUnit.SECONDS);
            recordProvisionAndStart(run7, metadataStore);
            metadataStore.recordProgramRunning(run7, time, (String) null, AppFabricTestHelper.createSourceId(atomicLong.incrementAndGet()));
            metadataStore.recordProgramStart(run7, (String) null, Collections.emptyMap(), AppFabricTestHelper.createSourceId(atomicLong.incrementAndGet()));
            Assert.assertEquals(ProgramRunStatus.RUNNING, metadataStore.getRun(run7).getStatus());
        });
    }

    private void assertPersistedStatus(AppMetadataStore appMetadataStore, TransactionExecutor transactionExecutor, long j, long j2, long j3, ProgramRunStatus programRunStatus) throws Exception {
        ProgramId program = NamespaceId.DEFAULT.app("app").program(ProgramType.WORKFLOW, "program");
        AtomicReference atomicReference = new AtomicReference();
        RunId generate = RunIds.generate(this.runIdTime.incrementAndGet());
        ProgramRunId run = program.run(generate);
        transactionExecutor.execute(() -> {
            appMetadataStore.recordProgramProvisioning(run, (Map) null, SINGLETON_PROFILE_MAP, AppFabricTestHelper.createSourceId(j), ARTIFACT_ID);
            appMetadataStore.recordProgramProvisioned(run, 0, AppFabricTestHelper.createSourceId(j + 1));
            appMetadataStore.recordProgramStart(run, (String) null, ImmutableMap.of(), AppFabricTestHelper.createSourceId(j + 2));
            appMetadataStore.recordProgramRunning(run, RunIds.getTime(generate, TimeUnit.SECONDS), (String) null, AppFabricTestHelper.createSourceId(j2));
            appMetadataStore.recordProgramStop(run, RunIds.getTime(generate, TimeUnit.SECONDS), ProgramRunStatus.KILLED, (BasicThrowable) null, AppFabricTestHelper.createSourceId(j3));
            atomicReference.set(appMetadataStore.getRun(run));
        });
        Assert.assertEquals(programRunStatus, ((RunRecordMeta) atomicReference.get()).getStatus());
    }

    @Test
    public void testScanRunningInRangeWithBatch() throws Exception {
        AppMetadataStore metadataStore = getMetadataStore("testScanRunningInRange");
        TransactionExecutor txExecutor = getTxExecutor(metadataStore);
        TreeSet treeSet = new TreeSet();
        for (int i = 0; i < 100; i++) {
            ProgramId program = NamespaceId.DEFAULT.app("app" + i).program(ProgramType.values()[i % ProgramType.values().length], "program" + i);
            RunId generate = RunIds.generate(this.runIdTime.incrementAndGet());
            ProgramRunId run = program.run(generate);
            treeSet.add(Long.valueOf(RunIds.getTime(generate, TimeUnit.MILLISECONDS)));
            int i2 = i;
            txExecutor.execute(() -> {
                recordProvisionAndStart(run, metadataStore);
                metadataStore.recordProgramRunning(run, RunIds.getTime(generate, TimeUnit.SECONDS) + 1, (String) null, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
                metadataStore.recordProgramStop(run, RunIds.getTime(generate, TimeUnit.SECONDS), STOP_STATUSES.get(i2 % STOP_STATUSES.size()), (BasicThrowable) null, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
            });
        }
        runScan(txExecutor, metadataStore, treeSet, 0L, Long.MAX_VALUE);
        runScan(txExecutor, metadataStore, treeSet.subSet(300000L, 900000L), TimeUnit.MILLISECONDS.toSeconds(300000L), TimeUnit.MILLISECONDS.toSeconds(900000L));
        runScan(txExecutor, metadataStore, treeSet.subSet(900000L, 1010000L), TimeUnit.MILLISECONDS.toSeconds(900000L), TimeUnit.MILLISECONDS.toSeconds(1010000L));
        runScan(txExecutor, metadataStore, treeSet.subSet(1010000L, 2000000L), TimeUnit.MILLISECONDS.toSeconds(1010000L), TimeUnit.MILLISECONDS.toSeconds(2000000L));
        runScan(txExecutor, metadataStore, treeSet.subSet(310000L, 310000L), TimeUnit.MILLISECONDS.toSeconds(310000L), TimeUnit.MILLISECONDS.toSeconds(310000L));
        runScan(txExecutor, metadataStore, treeSet.subSet(300000L, 310000L), TimeUnit.MILLISECONDS.toSeconds(300000L), TimeUnit.MILLISECONDS.toSeconds(310000L));
        runScan(txExecutor, metadataStore, treeSet.subSet(1000L, 10000L), TimeUnit.MILLISECONDS.toSeconds(1000L), TimeUnit.MILLISECONDS.toSeconds(10000L));
    }

    private void runScan(TransactionExecutor transactionExecutor, AppMetadataStore appMetadataStore, Set<Long> set, long j, long j2) throws InterruptedException, TransactionFailureException {
        transactionExecutor.execute(() -> {
            TreeSet treeSet = new TreeSet();
            List runningInRangeForStatus = appMetadataStore.getRunningInRangeForStatus("runRecordCompleted", j, j2, 25, new CountingTicker(1L));
            Iterables.addAll(treeSet, Iterables.transform(Iterables.concat(runningInRangeForStatus), runId -> {
                return Long.valueOf(RunIds.getTime(runId, TimeUnit.MILLISECONDS));
            }));
            Assert.assertEquals(set, treeSet);
            Assert.assertEquals((r0.getNumProcessed() - (2 * r0)) / 25, Iterables.size(runningInRangeForStatus));
        });
    }

    @Test
    public void testgetRuns() throws Exception {
        AppMetadataStore metadataStore = getMetadataStore("testgetRuns");
        TransactionExecutor txExecutor = getTxExecutor(metadataStore);
        TreeSet treeSet = new TreeSet();
        TreeSet treeSet2 = new TreeSet();
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (int i = 0; i < 100; i++) {
            ProgramId program = NamespaceId.DEFAULT.app("app").program(ProgramType.FLOW, "program");
            RunId generate = RunIds.generate(this.runIdTime.incrementAndGet());
            treeSet.add(generate.toString());
            int i2 = i;
            if (i % 2 == 0) {
                treeSet2.add(generate.toString());
            }
            ProgramRunId run = program.run(generate);
            hashSet.add(run);
            if (i % 2 == 0) {
                hashSet2.add(run);
            }
            txExecutor.execute(() -> {
                recordProvisionAndStart(run, metadataStore);
                metadataStore.recordProgramRunning(run, RunIds.getTime(generate, TimeUnit.SECONDS), (String) null, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
                metadataStore.recordProgramStop(run, RunIds.getTime(generate, TimeUnit.SECONDS), ProgramRunStatus.values()[i2 % ProgramRunStatus.values().length], (BasicThrowable) null, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
            });
        }
        txExecutor.execute(() -> {
            Map runs = metadataStore.getRuns(hashSet);
            TreeSet treeSet3 = new TreeSet();
            Iterator it = runs.entrySet().iterator();
            while (it.hasNext()) {
                treeSet3.add(((RunRecordMeta) ((Map.Entry) it.next()).getValue()).getPid());
            }
            Assert.assertEquals(treeSet, treeSet3);
            Map runs2 = metadataStore.getRuns(hashSet2);
            TreeSet treeSet4 = new TreeSet();
            Iterator it2 = runs2.entrySet().iterator();
            while (it2.hasNext()) {
                treeSet4.add(((RunRecordMeta) ((Map.Entry) it2.next()).getValue()).getPid());
            }
            Assert.assertEquals(treeSet2, treeSet4);
        });
    }

    @Test
    public void testGetActiveRuns() throws Exception {
        AppMetadataStore metadataStore = getMetadataStore("testGetActiveRuns");
        TransactionExecutor txExecutor = getTxExecutor(metadataStore);
        String str = "app1";
        String str2 = "app2";
        String str3 = "prog1";
        String str4 = "prog2";
        List asList = Arrays.asList(new NamespaceId("ns1"), new NamespaceId("ns2"));
        Collection<ApplicationId> collection = (Collection) asList.stream().flatMap(namespaceId -> {
            return Stream.of((Object[]) new ApplicationId[]{namespaceId.app(str), namespaceId.app(str2)});
        }).collect(Collectors.toList());
        Collection<ProgramId> collection2 = (Collection) collection.stream().flatMap(applicationId -> {
            return Stream.of((Object[]) new ProgramId[]{applicationId.mr(str3), applicationId.mr(str4)});
        }).collect(Collectors.toList());
        for (ProgramId programId : collection2) {
            txExecutor.execute(() -> {
                metadataStore.recordProgramProvisioning(programId.run(RunIds.generate()), Collections.emptyMap(), SINGLETON_PROFILE_MAP, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()), ARTIFACT_ID);
                ProgramRunId run = programId.run(RunIds.generate());
                metadataStore.recordProgramProvisioning(run, Collections.emptyMap(), SINGLETON_PROFILE_MAP, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()), ARTIFACT_ID);
                metadataStore.recordProgramProvisioned(run, 3, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
                metadataStore.recordProgramStart(run, UUID.randomUUID().toString(), Collections.emptyMap(), AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
                ProgramRunId run2 = programId.run(RunIds.generate());
                metadataStore.recordProgramProvisioning(run2, Collections.emptyMap(), SINGLETON_PROFILE_MAP, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()), ARTIFACT_ID);
                metadataStore.recordProgramProvisioned(run2, 3, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
                String uuid = UUID.randomUUID().toString();
                metadataStore.recordProgramStart(run2, uuid, Collections.emptyMap(), AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
                metadataStore.recordProgramRunning(run2, System.currentTimeMillis(), uuid, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
                ProgramRunId run3 = programId.run(RunIds.generate());
                metadataStore.recordProgramProvisioning(run3, Collections.emptyMap(), SINGLETON_PROFILE_MAP, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()), ARTIFACT_ID);
                metadataStore.recordProgramProvisioned(run3, 3, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
                String uuid2 = UUID.randomUUID().toString();
                metadataStore.recordProgramStart(run3, uuid2, Collections.emptyMap(), AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
                metadataStore.recordProgramRunning(run3, System.currentTimeMillis(), uuid2, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
                metadataStore.recordProgramSuspend(run3, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()), System.currentTimeMillis());
                for (ProgramRunStatus programRunStatus : ProgramRunStatus.values()) {
                    if (programRunStatus.isEndState()) {
                        ProgramRunId run4 = programId.run(RunIds.generate());
                        metadataStore.recordProgramProvisioning(run4, Collections.emptyMap(), SINGLETON_PROFILE_MAP, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()), ARTIFACT_ID);
                        metadataStore.recordProgramProvisioned(run4, 3, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
                        metadataStore.recordProgramStart(run4, UUID.randomUUID().toString(), Collections.emptyMap(), AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
                        metadataStore.recordProgramStop(run4, System.currentTimeMillis(), programRunStatus, (BasicThrowable) null, AppFabricTestHelper.createSourceId(this.sourceId.incrementAndGet()));
                    }
                }
            });
        }
        HashSet hashSet = new HashSet();
        hashSet.add(ProgramRunStatus.PENDING);
        hashSet.add(ProgramRunStatus.STARTING);
        hashSet.add(ProgramRunStatus.RUNNING);
        hashSet.add(ProgramRunStatus.SUSPENDED);
        txExecutor.execute(() -> {
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            Iterator it = asList.iterator();
            while (it.hasNext()) {
                NamespaceId namespaceId2 = (NamespaceId) it.next();
                Map activeRuns = metadataStore.getActiveRuns(namespaceId2);
                HashMap hashMap3 = new HashMap();
                hashMap3.put(namespaceId2.app(str).mr(str3), hashSet);
                hashMap3.put(namespaceId2.app(str).mr(str4), hashSet);
                hashMap3.put(namespaceId2.app(str2).mr(str3), hashSet);
                hashMap3.put(namespaceId2.app(str2).mr(str4), hashSet);
                HashMap hashMap4 = new HashMap();
                hashMap4.put(namespaceId2.app(str).mr(str3), new HashSet());
                hashMap4.put(namespaceId2.app(str).mr(str4), new HashSet());
                hashMap4.put(namespaceId2.app(str2).mr(str3), new HashSet());
                hashMap4.put(namespaceId2.app(str2).mr(str4), new HashSet());
                hashMap2.putAll(hashMap4);
                for (Map.Entry entry : activeRuns.entrySet()) {
                    ProgramId parent = ((ProgramRunId) entry.getKey()).getParent();
                    Assert.assertTrue("Unexpected program returned: " + parent, hashMap4.containsKey(((ProgramRunId) entry.getKey()).getParent()));
                    ((Set) hashMap4.get(parent)).add(((RunRecordMeta) entry.getValue()).getStatus());
                }
                Assert.assertEquals(hashMap3, hashMap4);
                hashMap.putAll(hashMap3);
            }
            for (Map.Entry entry2 : metadataStore.getActiveRuns(runRecordMeta -> {
                return true;
            }).entrySet()) {
                ProgramId parent2 = ((ProgramRunId) entry2.getKey()).getParent();
                Assert.assertTrue("Unexpected program returned: " + parent2, hashMap2.containsKey(((ProgramRunId) entry2.getKey()).getParent()));
                ((Set) hashMap2.get(parent2)).add(((RunRecordMeta) entry2.getValue()).getStatus());
            }
            Assert.assertEquals(hashMap, hashMap2);
        });
        for (ApplicationId applicationId2 : collection) {
            txExecutor.execute(() -> {
                Map activeRuns = metadataStore.getActiveRuns(applicationId2);
                HashMap hashMap = new HashMap();
                hashMap.put(applicationId2.mr(str3), hashSet);
                hashMap.put(applicationId2.mr(str4), hashSet);
                HashMap hashMap2 = new HashMap();
                hashMap2.put(applicationId2.mr(str3), new HashSet());
                hashMap2.put(applicationId2.mr(str4), new HashSet());
                for (Map.Entry entry : activeRuns.entrySet()) {
                    ProgramId parent = ((ProgramRunId) entry.getKey()).getParent();
                    Assert.assertTrue("Unexpected program returned: " + parent, hashMap2.containsKey(((ProgramRunId) entry.getKey()).getParent()));
                    ((Set) hashMap2.get(parent)).add(((RunRecordMeta) entry.getValue()).getStatus());
                }
                Assert.assertEquals(hashMap, hashMap2);
            });
        }
        for (ProgramId programId2 : collection2) {
            txExecutor.execute(() -> {
                Map activeRuns = metadataStore.getActiveRuns(programId2);
                HashSet hashSet2 = new HashSet();
                for (Map.Entry entry : activeRuns.entrySet()) {
                    Assert.assertEquals(programId2, ((ProgramRunId) entry.getKey()).getParent());
                    hashSet2.add(((RunRecordMeta) entry.getValue()).getStatus());
                }
                Assert.assertEquals(hashSet, hashSet2);
            });
        }
    }

    @Test
    public void testDuplicateWritesIgnored() throws Exception {
        DatasetId dataset = NamespaceId.DEFAULT.dataset("duplicateWrites");
        datasetFramework.addInstance(Table.class.getName(), dataset, DatasetProperties.EMPTY);
        Table dataset2 = datasetFramework.getDataset(dataset, Collections.emptyMap(), (ClassLoader) null);
        Assert.assertNotNull(dataset2);
        AppMetadataStore appMetadataStore = new AppMetadataStore(dataset2, cConf);
        TransactionExecutor createExecutor = txExecutorFactory.createExecutor(Collections.singleton(appMetadataStore));
        ProgramRunId run = NamespaceId.DEFAULT.app("app").program(ProgramType.values()[ProgramType.values().length - 1], "program").run(RunIds.generate());
        byte[] bArr = {0};
        createExecutor.execute(() -> {
            assertSecondCallIsNull(() -> {
                return appMetadataStore.recordProgramProvisioning(run, (Map) null, SINGLETON_PROFILE_MAP, bArr, ARTIFACT_ID);
            });
            assertSecondCallIsNull(() -> {
                return appMetadataStore.recordProgramProvisioned(run, 0, bArr);
            });
            assertSecondCallIsNull(() -> {
                return appMetadataStore.recordProgramStart(run, (String) null, Collections.emptyMap(), bArr);
            });
            assertSecondCallIsNull(() -> {
                return appMetadataStore.recordProgramRunning(run, System.currentTimeMillis(), (String) null, bArr);
            });
            assertSecondCallIsNull(() -> {
                return appMetadataStore.recordProgramSuspend(run, bArr, System.currentTimeMillis());
            });
            assertSecondCallIsNull(() -> {
                return appMetadataStore.recordProgramRunning(run, System.currentTimeMillis(), (String) null, bArr);
            });
            assertSecondCallIsNull(() -> {
                return appMetadataStore.recordProgramStop(run, System.currentTimeMillis(), ProgramRunStatus.KILLED, (BasicThrowable) null, bArr);
            });
            assertSecondCallIsNull(() -> {
                return appMetadataStore.recordProgramDeprovisioning(run, bArr);
            });
            assertSecondCallIsNull(() -> {
                return appMetadataStore.recordProgramDeprovisioned(run, Long.valueOf(System.currentTimeMillis()), bArr);
            });
        });
    }

    private <T> void assertSecondCallIsNull(Callable<T> callable) throws Exception {
        Assert.assertNotNull(callable.call());
        Assert.assertNull(callable.call());
    }

    @Test
    public void testProfileInRunRecord() throws Exception {
        AppMetadataStore metadataStore = getMetadataStore("testProfileInRunRecord");
        TransactionExecutor txExecutor = getTxExecutor(metadataStore);
        ProgramRunId run = NamespaceId.DEFAULT.app("myApp").workflow("myProgram").run(RunIds.generate());
        ProfileId profile = NamespaceId.DEFAULT.profile("MyProfile");
        txExecutor.execute(() -> {
            metadataStore.recordProgramProvisioning(run, (Map) null, Collections.singletonMap("system.profile.name", profile.getScopedName()), AppFabricTestHelper.createSourceId(1L), ARTIFACT_ID);
            Assert.assertEquals(profile, metadataStore.getRun(run).getProfileId());
            metadataStore.recordProgramProvisioned(run, 0, AppFabricTestHelper.createSourceId(1 + 1));
            metadataStore.recordProgramStart(run, (String) null, ImmutableMap.of(), AppFabricTestHelper.createSourceId(1 + 2));
            metadataStore.recordProgramRunning(run, RunIds.getTime(run.getRun(), TimeUnit.SECONDS), (String) null, AppFabricTestHelper.createSourceId(1 + 3));
            metadataStore.recordProgramStop(run, RunIds.getTime(run.getRun(), TimeUnit.SECONDS), ProgramRunStatus.KILLED, (BasicThrowable) null, AppFabricTestHelper.createSourceId(1 + 4));
            Assert.assertEquals(profile, metadataStore.getRun(run).getProfileId());
        });
    }
}
