package co.cask.cdap.internal.app.services;

import co.cask.cdap.api.artifact.ArtifactId;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.lib.cube.AggregationFunction;
import co.cask.cdap.api.dataset.lib.cube.TimeValue;
import co.cask.cdap.api.dataset.table.Row;
import co.cask.cdap.api.dataset.table.Scanner;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.api.metrics.MetricDataQuery;
import co.cask.cdap.api.metrics.MetricStore;
import co.cask.cdap.api.metrics.MetricTimeSeries;
import co.cask.cdap.app.program.ProgramDescriptor;
import co.cask.cdap.app.runtime.ProgramStateWriter;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.utils.Tasks;
import co.cask.cdap.data2.datafabric.dataset.DatasetsUtil;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.transaction.TransactionExecutorFactory;
import co.cask.cdap.internal.AppFabricTestHelper;
import co.cask.cdap.internal.app.DefaultApplicationSpecification;
import co.cask.cdap.internal.app.runtime.BasicArguments;
import co.cask.cdap.internal.app.runtime.SimpleProgramOptions;
import co.cask.cdap.internal.app.store.AppMetadataStore;
import co.cask.cdap.internal.app.store.RunRecordMeta;
import co.cask.cdap.internal.profile.ProfileService;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProfileId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.id.WorkflowId;
import co.cask.cdap.proto.profile.Profile;
import co.cask.cdap.reporting.ProgramHeartbeatDataset;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.tephra.TransactionExecutor;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:co/cask/cdap/internal/app/services/ProgramNotificationSubscriberServiceTest.class */
public class ProgramNotificationSubscriberServiceTest {
    private static final String SYSTEM_METRIC_PREFIX = "system.";
    private static Injector injector;
    private static Table appMetaTable;
    private static AppMetadataStore metadataStoreDataset;
    private static ProgramHeartbeatDataset programHeartbeatDataset;
    private static TransactionExecutor txnl;
    private static TransactionExecutor heartBeatTxnl;
    private static ProgramStateWriter programStateWriter;

    @BeforeClass
    public static void setupClass() throws Exception {
        injector = AppFabricTestHelper.getInjector();
        CConfiguration cConfiguration = (CConfiguration) injector.getInstance(CConfiguration.class);
        cConfiguration.set("program.heartbeat.interval.seconds", String.valueOf(TimeUnit.HOURS.toSeconds(1L)));
        DatasetFramework datasetFramework = (DatasetFramework) injector.getInstance(DatasetFramework.class);
        TransactionExecutorFactory transactionExecutorFactory = (TransactionExecutorFactory) injector.getInstance(TransactionExecutorFactory.class);
        appMetaTable = DatasetsUtil.getOrCreateDataset(datasetFramework, NamespaceId.SYSTEM.dataset("app.meta"), Table.class.getName(), DatasetProperties.EMPTY, Collections.emptyMap());
        metadataStoreDataset = new AppMetadataStore(appMetaTable, cConfiguration);
        Table orCreateDataset = DatasetsUtil.getOrCreateDataset(datasetFramework, NamespaceId.SYSTEM.dataset("program.heartbeat"), Table.class.getName(), DatasetProperties.EMPTY, Collections.emptyMap());
        txnl = transactionExecutorFactory.createExecutor(Collections.singleton(metadataStoreDataset));
        programHeartbeatDataset = new ProgramHeartbeatDataset(orCreateDataset);
        heartBeatTxnl = transactionExecutorFactory.createExecutor(Collections.singleton(programHeartbeatDataset));
        programStateWriter = (ProgramStateWriter) injector.getInstance(ProgramStateWriter.class);
    }

    @After
    public void cleanupTest() throws Exception {
        txnl.execute(() -> {
            Scanner scan = appMetaTable.scan((byte[]) null, (byte[]) null);
            while (true) {
                Row next = scan.next();
                if (next == null) {
                    return;
                } else {
                    appMetaTable.delete(next.getRow());
                }
            }
        });
    }

    @Test
    public void testAppSpecNotRequiredToWriteState() throws Exception {
        ProgramId program = NamespaceId.DEFAULT.app("someapp").program(ProgramType.SERVICE, "s");
        HashMap hashMap = new HashMap();
        hashMap.put("skipProvisioning", Boolean.TRUE.toString());
        hashMap.put("system.profile.name", ProfileId.NATIVE.getScopedName());
        SimpleProgramOptions simpleProgramOptions = new SimpleProgramOptions(program, new BasicArguments(hashMap), new BasicArguments());
        ProgramRunId run = program.run(RunIds.generate());
        ArtifactId apiArtifactId = NamespaceId.DEFAULT.artifact("testArtifact", "1.0").toApiArtifactId();
        programStateWriter.start(run, simpleProgramOptions, (String) null, new ProgramDescriptor(program, new DefaultApplicationSpecification("name", "1.0.0", "desc", (String) null, apiArtifactId, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())));
        Tasks.waitFor(ProgramRunStatus.STARTING, () -> {
            return (ProgramRunStatus) txnl.execute(() -> {
                RunRecordMeta run2 = metadataStoreDataset.getRun(run);
                if (run2 == null) {
                    return null;
                }
                Assert.assertEquals(apiArtifactId, run2.getArtifactId());
                return run2.getStatus();
            });
        }, 10L, TimeUnit.SECONDS);
        programStateWriter.completed(run);
    }

    @Test
    public void testMetricsEmit() throws Exception {
        ProfileService profileService = (ProfileService) injector.getInstance(ProfileService.class);
        ProfileId profile = NamespaceId.DEFAULT.profile("MyProfile");
        profileService.saveProfile(profile, Profile.NATIVE);
        WorkflowId workflow = NamespaceId.DEFAULT.app("myApp").workflow("myProgram");
        ArtifactId apiArtifactId = NamespaceId.DEFAULT.artifact("testArtifact", "1.0").toApiArtifactId();
        ProgramRunId run = workflow.run(RunIds.generate(System.currentTimeMillis()).getId());
        Map singletonMap = Collections.singletonMap("system.profile.name", profile.getScopedName());
        long currentTimeMillis = System.currentTimeMillis();
        txnl.execute(() -> {
            int i = 0 + 1;
            metadataStoreDataset.recordProgramProvisioning(run, Collections.emptyMap(), singletonMap, AppFabricTestHelper.createSourceId(i), apiArtifactId);
            int i2 = i + 1;
            metadataStoreDataset.recordProgramProvisioned(run, 3, AppFabricTestHelper.createSourceId(i2));
            metadataStoreDataset.recordProgramStart(run, (String) null, singletonMap, AppFabricTestHelper.createSourceId(i2 + 1));
            metadataStoreDataset.recordProgramRunning(run, currentTimeMillis + 60000, (String) null, AppFabricTestHelper.createSourceId(r13 + 1));
        });
        programStateWriter.completed(run);
        MetricStore metricStore = (MetricStore) injector.getInstance(MetricStore.class);
        Tasks.waitFor(1L, () -> {
            return Long.valueOf(getMetric(metricStore, run, profile, "system.program.completed.runs"));
        }, 10L, TimeUnit.SECONDS);
        Assert.assertEquals(1L, getMetric(metricStore, run, profile, "system.program.completed.runs"));
        Assert.assertEquals(0L, getMetric(metricStore, run, profile, "system.program.killed.runs"));
        Assert.assertEquals(0L, getMetric(metricStore, run, profile, "system.program.failed.runs"));
        metricStore.deleteAll();
    }

    @Test
    public void testHeartBeatStoreForProgramStatusMessages() throws Exception {
        ProgramId program = NamespaceId.DEFAULT.app("someapp", "1.0-SNAPSHOT").program(ProgramType.SERVICE, "s");
        HashMap hashMap = new HashMap();
        hashMap.put("skipProvisioning", Boolean.TRUE.toString());
        hashMap.put("system.profile.name", ProfileId.NATIVE.getScopedName());
        SimpleProgramOptions simpleProgramOptions = new SimpleProgramOptions(program, new BasicArguments(hashMap), new BasicArguments());
        ProgramRunId run = program.run(RunIds.generate());
        ArtifactId apiArtifactId = NamespaceId.DEFAULT.artifact("testArtifact", "1.0").toApiArtifactId();
        ProgramDescriptor programDescriptor = new ProgramDescriptor(program, new DefaultApplicationSpecification("name", "1.0.0", "desc", (String) null, apiArtifactId, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()));
        heartBeatTxnl.execute(() -> {
            programStateWriter.start(run, simpleProgramOptions, (String) null, programDescriptor);
        });
        checkProgramStatus(apiArtifactId, run, ProgramRunStatus.STARTING);
        long currentTimeMillis = System.currentTimeMillis();
        heartBeatTxnl.execute(() -> {
            programStateWriter.running(run, (String) null);
        });
        checkProgramStatus(apiArtifactId, run, ProgramRunStatus.RUNNING);
        heartbeatDatasetStatusCheck(currentTimeMillis, ProgramRunStatus.RUNNING);
        long currentTimeMillis2 = System.currentTimeMillis();
        heartBeatTxnl.execute(() -> {
            programStateWriter.suspend(run);
        });
        checkProgramStatus(apiArtifactId, run, ProgramRunStatus.SUSPENDED);
        heartbeatDatasetStatusCheck(currentTimeMillis2, ProgramRunStatus.SUSPENDED);
        long currentTimeMillis3 = System.currentTimeMillis();
        heartBeatTxnl.execute(() -> {
            programStateWriter.resume(run);
        });
        checkProgramStatus(apiArtifactId, run, ProgramRunStatus.RUNNING);
        heartbeatDatasetStatusCheck(currentTimeMillis3, ProgramRunStatus.RUNNING);
        long currentTimeMillis4 = System.currentTimeMillis();
        heartBeatTxnl.execute(() -> {
            programStateWriter.error(run, new Throwable("Testing"));
        });
        checkProgramStatus(apiArtifactId, run, ProgramRunStatus.FAILED);
        heartbeatDatasetStatusCheck(currentTimeMillis4, ProgramRunStatus.FAILED);
        ProgramRunId run2 = program.run(RunIds.generate());
        heartBeatTxnl.execute(() -> {
            programStateWriter.start(run2, simpleProgramOptions, (String) null, programDescriptor);
        });
        checkProgramStatus(apiArtifactId, run2, ProgramRunStatus.STARTING);
        long currentTimeMillis5 = System.currentTimeMillis();
        heartBeatTxnl.execute(() -> {
            programStateWriter.running(run2, (String) null);
        });
        checkProgramStatus(apiArtifactId, run2, ProgramRunStatus.RUNNING);
        heartbeatDatasetStatusCheck(currentTimeMillis5, ProgramRunStatus.RUNNING);
        long currentTimeMillis6 = System.currentTimeMillis();
        heartBeatTxnl.execute(() -> {
            programStateWriter.completed(run2);
        });
        checkProgramStatus(apiArtifactId, run2, ProgramRunStatus.COMPLETED);
        heartbeatDatasetStatusCheck(currentTimeMillis6, ProgramRunStatus.COMPLETED);
    }

    private void checkProgramStatus(ArtifactId artifactId, ProgramRunId programRunId, ProgramRunStatus programRunStatus) throws InterruptedException, ExecutionException, TimeoutException {
        Tasks.waitFor(programRunStatus, () -> {
            return (ProgramRunStatus) txnl.execute(() -> {
                RunRecordMeta run = metadataStoreDataset.getRun(programRunId);
                if (run == null) {
                    return null;
                }
                Assert.assertEquals(artifactId, run.getArtifactId());
                return run.getStatus();
            });
        }, 10L, TimeUnit.SECONDS);
    }

    private void heartbeatDatasetStatusCheck(long j, ProgramRunStatus programRunStatus) throws InterruptedException, ExecutionException, TimeoutException {
        Tasks.waitFor(programRunStatus, () -> {
            return (ProgramRunStatus) heartBeatTxnl.execute(() -> {
                Collection scan = programHeartbeatDataset.scan(TimeUnit.MILLISECONDS.toSeconds(j), TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()), ImmutableSet.of(NamespaceId.DEFAULT.getNamespace()));
                if (scan.size() == 0) {
                    return null;
                }
                Assert.assertEquals(1L, scan.size());
                return ((RunRecordMeta) scan.iterator().next()).getStatus();
            });
        }, 10L, TimeUnit.SECONDS);
    }

    private long getMetric(MetricStore metricStore, ProgramRunId programRunId, ProfileId profileId, String str) {
        Collection query = metricStore.query(new MetricDataQuery(0L, 0L, Integer.MAX_VALUE, str, AggregationFunction.SUM, ImmutableMap.builder().put("psc", profileId.getScope().name()).put("pro", profileId.getProfile()).put("ns", programRunId.getNamespace()).put("prt", programRunId.getType().getPrettyName()).put("app", programRunId.getApplication()).put("prg", programRunId.getProgram()).build(), new ArrayList()));
        if (query.isEmpty()) {
            return 0L;
        }
        List timeValues = ((MetricTimeSeries) query.iterator().next()).getTimeValues();
        if (timeValues.isEmpty()) {
            return 0L;
        }
        return ((TimeValue) timeValues.get(0)).getValue();
    }
}
