package co.cask.cdap.internal.app.program;

import co.cask.cdap.api.artifact.ArtifactId;
import co.cask.cdap.app.program.ProgramDescriptor;
import co.cask.cdap.app.runtime.NoOpProgramStateWriter;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.utils.Tasks;
import co.cask.cdap.internal.app.DefaultApplicationSpecification;
import co.cask.cdap.internal.app.runtime.BasicArguments;
import co.cask.cdap.internal.app.runtime.SimpleProgramOptions;
import co.cask.cdap.proto.Notification;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ProgramRunId;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:co/cask/cdap/internal/app/program/ProgramStateWriterWithHeartBeatTest.class */
public class ProgramStateWriterWithHeartBeatTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/internal/app/program/ProgramStateWriterWithHeartBeatTest$MockProgramStatePublisher.class */
    public static class MockProgramStatePublisher implements ProgramStatePublisher {
        long heartBeatCount;

        private MockProgramStatePublisher() {
            this.heartBeatCount = 0L;
        }

        public void publish(Notification.Type type, Map<String, String> map) {
            if (Notification.Type.PROGRAM_HEART_BEAT.equals(type)) {
                Assert.assertTrue(map.containsKey("heartbeat.time"));
                this.heartBeatCount++;
            }
        }

        long getHeartBeatCount() {
            return this.heartBeatCount;
        }
    }

    @Test
    public void testHeartBeatThread() throws InterruptedException, ExecutionException, TimeoutException {
        MockProgramStatePublisher mockProgramStatePublisher = new MockProgramStatePublisher();
        NoOpProgramStateWriter noOpProgramStateWriter = new NoOpProgramStateWriter();
        ProgramId program = NamespaceId.DEFAULT.app("someapp").program(ProgramType.SERVICE, "s");
        HashMap hashMap = new HashMap();
        hashMap.put("skipProvisioning", Boolean.TRUE.toString());
        SimpleProgramOptions simpleProgramOptions = new SimpleProgramOptions(program, new BasicArguments(hashMap), new BasicArguments());
        ProgramRunId run = program.run(RunIds.generate());
        ArtifactId apiArtifactId = NamespaceId.DEFAULT.artifact("testArtifact", "1.0").toApiArtifactId();
        ProgramStateWriterWithHeartBeat programStateWriterWithHeartBeat = new ProgramStateWriterWithHeartBeat(run, noOpProgramStateWriter, 1L, mockProgramStatePublisher);
        programStateWriterWithHeartBeat.start(simpleProgramOptions, (String) null, new ProgramDescriptor(program, new DefaultApplicationSpecification("name", "1.0.0", "desc", (String) null, apiArtifactId, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())));
        Assert.assertEquals(0L, mockProgramStatePublisher.getHeartBeatCount());
        programStateWriterWithHeartBeat.running((String) null);
        Tasks.waitFor(true, () -> {
            return Boolean.valueOf(((MockProgramStatePublisher) mockProgramStatePublisher).getHeartBeatCount() > 1);
        }, 10L, TimeUnit.SECONDS, "Didn't receive expected heartbeat after 10 seconds");
        programStateWriterWithHeartBeat.suspend();
        Tasks.waitFor(false, () -> {
            return Boolean.valueOf(programStateWriterWithHeartBeat.isHeartBeatThreadAlive());
        }, 5L, TimeUnit.SECONDS, "Heartbeat thread did not stop after 5 seconds");
        long heartBeatCount = mockProgramStatePublisher.getHeartBeatCount();
        programStateWriterWithHeartBeat.resume();
        long j = heartBeatCount + 1;
        Tasks.waitFor(true, () -> {
            return Boolean.valueOf(((MockProgramStatePublisher) mockProgramStatePublisher).getHeartBeatCount() > j);
        }, 10L, TimeUnit.SECONDS, "Didn't receive expected heartbeat after 10 seconds after resuming program");
        programStateWriterWithHeartBeat.killed();
        Tasks.waitFor(false, () -> {
            return Boolean.valueOf(programStateWriterWithHeartBeat.isHeartBeatThreadAlive());
        }, 5L, TimeUnit.SECONDS, "Heartbeat thread did not stop after 5 seconds");
    }
}
