package co.cask.cdap.reporting;

import co.cask.cdap.api.artifact.ArtifactId;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.transaction.TransactionExecutorFactory;
import co.cask.cdap.internal.AppFabricTestHelper;
import co.cask.cdap.internal.app.store.RunRecordMeta;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramId;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.tephra.TransactionExecutor;
import org.apache.tephra.TransactionFailureException;
import org.apache.twill.api.RunId;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:co/cask/cdap/reporting/ProgramHeartBeatDatasetTest.class */
public class ProgramHeartBeatDatasetTest {
    private static DatasetFramework datasetFramework;
    private static TransactionExecutorFactory txExecutorFactory;
    private static final ArtifactId ARTIFACT_ID = NamespaceId.DEFAULT.artifact("testArtifact", "1.0").toApiArtifactId();
    private static final byte[] SOURCE_ID = Bytes.toBytes("sourceId");

    @BeforeClass
    public static void beforeClass() throws Exception {
        Injector injector = AppFabricTestHelper.getInjector();
        AppFabricTestHelper.ensureNamespaceExists(NamespaceId.DEFAULT);
        datasetFramework = (DatasetFramework) injector.getInstance(DatasetFramework.class);
        txExecutorFactory = (TransactionExecutorFactory) injector.getInstance(TransactionExecutorFactory.class);
    }

    @Test
    public void testWritingScanningHeartBeats() throws Exception {
        ProgramHeartbeatDataset heartBeatStore = getHeartBeatStore("testBeats");
        TransactionExecutor createExecutor = txExecutorFactory.createExecutor(Collections.singleton(heartBeatStore));
        long seconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
        RunRecordMeta.Builder mockRunRecordMeta = getMockRunRecordMeta(NamespaceId.DEFAULT, RunIds.generate());
        mockRunRecordMeta.setRunTime(Long.valueOf(seconds));
        mockRunRecordMeta.setStatus(ProgramRunStatus.RUNNING);
        RunRecordMeta build = mockRunRecordMeta.build();
        createExecutor.execute(() -> {
            heartBeatStore.writeRunRecordMeta(build, seconds);
        });
        long seconds2 = seconds + TimeUnit.MINUTES.toSeconds(10L);
        long seconds3 = TimeUnit.MINUTES.toSeconds(1L);
        setUpProgramHeartBeats(build, seconds, seconds2, seconds3, createExecutor, heartBeatStore);
        long seconds4 = seconds + TimeUnit.MINUTES.toSeconds(5L);
        RunRecordMeta.Builder mockRunRecordMeta2 = getMockRunRecordMeta(NamespaceId.DEFAULT, RunIds.generate());
        mockRunRecordMeta2.setRunTime(Long.valueOf(seconds4));
        mockRunRecordMeta2.setStatus(ProgramRunStatus.RUNNING);
        RunRecordMeta build2 = mockRunRecordMeta2.build();
        createExecutor.execute(() -> {
            heartBeatStore.writeRunRecordMeta(build2, seconds4);
        });
        setUpProgramHeartBeats(build2, seconds4, seconds2, seconds3, createExecutor, heartBeatStore);
        createExecutor.execute(() -> {
            Collection scan = heartBeatStore.scan(seconds, seconds4, ImmutableSet.of(NamespaceId.DEFAULT.getNamespace()));
            Assert.assertEquals(1L, scan.size());
            Assert.assertEquals(build, scan.iterator().next());
            ImmutableSet of = ImmutableSet.of(build, build2);
            Collection scan2 = heartBeatStore.scan(seconds, seconds2, ImmutableSet.of(NamespaceId.DEFAULT.getNamespace()));
            Assert.assertEquals(of.size(), scan2.size());
            Assert.assertTrue(scan2.containsAll(of));
            Collection scan3 = heartBeatStore.scan(seconds4, seconds2, ImmutableSet.of(NamespaceId.DEFAULT.getNamespace()));
            Assert.assertEquals(of.size(), scan3.size());
            Assert.assertTrue(scan3.containsAll(of));
        });
    }

    private RunRecordMeta.Builder getMockRunRecordMeta(NamespaceId namespaceId, RunId runId) {
        ProgramId program = namespaceId.app("someapp").program(ProgramType.SERVICE, "s");
        RunRecordMeta.Builder builder = RunRecordMeta.builder();
        builder.setArtifactId(ARTIFACT_ID);
        builder.setPrincipal("userA");
        builder.setProgramRunId(program.run(runId));
        builder.setSourceId(SOURCE_ID);
        builder.setStartTime(RunIds.getTime(runId, TimeUnit.SECONDS));
        return builder;
    }

    private void setUpProgramHeartBeats(RunRecordMeta runRecordMeta, long j, long j2, long j3, TransactionExecutor transactionExecutor, ProgramHeartbeatDataset programHeartbeatDataset) throws InterruptedException, TransactionFailureException {
        transactionExecutor.execute(() -> {
            long j4 = j;
            while (true) {
                long j5 = j4 + j3;
                if (j5 >= j2) {
                    return;
                }
                programHeartbeatDataset.writeRunRecordMeta(runRecordMeta, j5);
                j4 = j5;
            }
        });
    }

    @Test
    public void testScanningProgramStatus() throws Exception {
        ProgramHeartbeatDataset heartBeatStore = getHeartBeatStore("testStatusChange");
        TransactionExecutor createExecutor = txExecutorFactory.createExecutor(Collections.singleton(heartBeatStore));
        long seconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
        RunId generate = RunIds.generate();
        RunRecordMeta.Builder mockRunRecordMeta = getMockRunRecordMeta(NamespaceId.DEFAULT, generate);
        mockRunRecordMeta.setStatus(ProgramRunStatus.RUNNING);
        mockRunRecordMeta.setRunTime(Long.valueOf(seconds));
        RunRecordMeta build = mockRunRecordMeta.build();
        createExecutor.execute(() -> {
            heartBeatStore.writeRunRecordMeta(build, seconds);
        });
        long seconds2 = seconds + TimeUnit.MINUTES.toSeconds(4L);
        setUpProgramHeartBeats(build, seconds, seconds2, TimeUnit.MINUTES.toSeconds(1L), createExecutor, heartBeatStore);
        long seconds3 = seconds + TimeUnit.MINUTES.toSeconds(5L);
        RunRecordMeta.Builder mockRunRecordMeta2 = getMockRunRecordMeta(NamespaceId.DEFAULT, generate);
        mockRunRecordMeta2.setStatus(ProgramRunStatus.KILLED);
        mockRunRecordMeta2.setStopTime(Long.valueOf(seconds3));
        RunRecordMeta build2 = mockRunRecordMeta2.build();
        createExecutor.execute(() -> {
            heartBeatStore.writeRunRecordMeta(build2, seconds3);
        });
        createExecutor.execute(() -> {
            Collection scan = heartBeatStore.scan(seconds, seconds2, ImmutableSet.of(NamespaceId.DEFAULT.getNamespace()));
            Assert.assertEquals(1L, scan.size());
            Assert.assertEquals(ProgramRunStatus.RUNNING, ((RunRecordMeta) scan.iterator().next()).getStatus());
            Assert.assertEquals(build, scan.iterator().next());
            Collection scan2 = heartBeatStore.scan(seconds, seconds3 + 1, ImmutableSet.of(NamespaceId.DEFAULT.getNamespace()));
            Assert.assertEquals(1L, scan2.size());
            Assert.assertEquals(build2, scan2.iterator().next());
        });
    }

    private ProgramHeartbeatDataset getHeartBeatStore(String str) throws Exception {
        DatasetId dataset = NamespaceId.SYSTEM.dataset(str);
        datasetFramework.addInstance(Table.class.getName(), dataset, DatasetProperties.EMPTY);
        Table dataset2 = datasetFramework.getDataset(dataset, ImmutableMap.of(), (ClassLoader) null);
        Assert.assertNotNull(dataset2);
        return new ProgramHeartbeatDataset(dataset2);
    }
}
