package co.cask.cdap.internal.app.program;

import co.cask.cdap.app.program.ProgramDescriptor;
import co.cask.cdap.app.runtime.Arguments;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.app.runtime.ProgramStateWriter;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.service.RetryStrategies;
import co.cask.cdap.internal.app.ApplicationSpecificationAdapter;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.codec.ArgumentsCodec;
import co.cask.cdap.internal.app.runtime.codec.ProgramOptionsCodec;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.proto.Notification;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramRunId;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/program/ProgramStateWriterWithHeartBeat.class */
public class ProgramStateWriterWithHeartBeat {
    private static final Logger LOG = LoggerFactory.getLogger(ProgramStateWriterWithHeartBeat.class);
    private static final Gson GSON = ApplicationSpecificationAdapter.addTypeAdapters(new GsonBuilder()).registerTypeAdapter(Arguments.class, new ArgumentsCodec()).registerTypeAdapter(ProgramOptions.class, new ProgramOptionsCodec()).create();
    private final long heartBeatIntervalSeconds;
    private final ProgramStateWriter programStateWriter;
    private final ProgramRunId programRunId;
    private final ProgramStatePublisher messagingProgramStatePublisher;
    private ScheduledExecutorService scheduledExecutorService;

    public ProgramStateWriterWithHeartBeat(ProgramRunId programRunId, ProgramStateWriter programStateWriter, MessagingService messagingService, CConfiguration cConfiguration) {
        this(programRunId, programStateWriter, cConfiguration.getLong("program.heartbeat.interval.seconds"), new MessagingProgramStatePublisher(messagingService, NamespaceId.SYSTEM.topic(cConfiguration.get("program.status.event.topic")), RetryStrategies.fromConfiguration(cConfiguration, "system.program.state.")));
    }

    @VisibleForTesting
    ProgramStateWriterWithHeartBeat(ProgramRunId programRunId, ProgramStateWriter programStateWriter, long j, ProgramStatePublisher programStatePublisher) {
        this.programRunId = programRunId;
        this.programStateWriter = programStateWriter;
        this.heartBeatIntervalSeconds = j;
        this.messagingProgramStatePublisher = programStatePublisher;
    }

    public void start(ProgramOptions programOptions, @Nullable String str, ProgramDescriptor programDescriptor) {
        this.programStateWriter.start(this.programRunId, programOptions, str, programDescriptor);
    }

    public void running(@Nullable String str) {
        this.programStateWriter.running(this.programRunId, str);
        scheduleHeartBeatThread();
    }

    public void completed() {
        stopHeartbeatThread();
        this.programStateWriter.completed(this.programRunId);
    }

    public void killed() {
        stopHeartbeatThread();
        this.programStateWriter.killed(this.programRunId);
    }

    public void error(Throwable th) {
        stopHeartbeatThread();
        this.programStateWriter.error(this.programRunId, th);
    }

    public void suspend() {
        stopHeartbeatThread();
        this.programStateWriter.suspend(this.programRunId);
    }

    public void resume() {
        scheduleHeartBeatThread();
        this.programStateWriter.resume(this.programRunId);
    }

    private void scheduleHeartBeatThread() {
        if (this.scheduledExecutorService == null || this.scheduledExecutorService.isShutdown()) {
            this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("program-heart-beat"));
            this.scheduledExecutorService.scheduleAtFixedRate(() -> {
                HashMap hashMap = new HashMap();
                hashMap.put(ProgramOptionConstants.PROGRAM_RUN_ID, GSON.toJson(this.programRunId));
                hashMap.put(ProgramOptionConstants.HEART_BEAT_TIME, String.valueOf(System.currentTimeMillis()));
                this.messagingProgramStatePublisher.publish(Notification.Type.PROGRAM_HEART_BEAT, hashMap);
                LOG.trace("Sent heartbeat for program {}", this.programRunId);
            }, this.heartBeatIntervalSeconds, this.heartBeatIntervalSeconds, TimeUnit.SECONDS);
        }
    }

    private void stopHeartbeatThread() {
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdownNow();
        }
    }

    @VisibleForTesting
    boolean isHeartBeatThreadAlive() {
        return (this.scheduledExecutorService == null || this.scheduledExecutorService.isShutdown()) ? false : true;
    }
}
