package co.cask.cdap.internal.app.program;

import co.cask.cdap.api.messaging.TopicNotFoundException;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.app.runtime.ProgramStateWriter;
import co.cask.cdap.common.ServiceUnavailableException;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.service.RetryStrategies;
import co.cask.cdap.common.service.RetryStrategy;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.client.StoreRequestBuilder;
import co.cask.cdap.proto.BasicThrowable;
import co.cask.cdap.proto.Notification;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.id.TopicId;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/program/MessagingProgramStateWriter.class */
public final class MessagingProgramStateWriter implements ProgramStateWriter {
    private static final Logger LOG = LoggerFactory.getLogger(MessagingProgramStateWriter.class);
    private static final Gson GSON = new Gson();
    private final MessagingService messagingService;
    private final TopicId topicId;
    private final RetryStrategy retryStrategy;

    @Inject
    public MessagingProgramStateWriter(CConfiguration cConfiguration, MessagingService messagingService) {
        this.topicId = NamespaceId.SYSTEM.topic(cConfiguration.get("program.status.event.topic"));
        this.retryStrategy = RetryStrategies.fromConfiguration(cConfiguration, "system.program.state.");
        this.messagingService = messagingService;
    }

    @Override // co.cask.cdap.app.runtime.ProgramStateWriter
    public void start(ProgramRunId programRunId, ProgramOptions programOptions, @Nullable String str) {
        long time = RunIds.getTime(RunIds.fromString(programRunId.getRun()), TimeUnit.MILLISECONDS);
        if (time == -1) {
            time = System.currentTimeMillis();
        }
        ImmutableMap.Builder<String, String> put = ImmutableMap.builder().put(ProgramOptionConstants.PROGRAM_RUN_ID, GSON.toJson(programRunId)).put(ProgramOptionConstants.START_TIME, String.valueOf(time)).put(ProgramOptionConstants.PROGRAM_STATUS, ProgramRunStatus.STARTING.name()).put(ProgramOptionConstants.USER_OVERRIDES, GSON.toJson(programOptions.getUserArguments().asMap())).put(ProgramOptionConstants.SYSTEM_OVERRIDES, GSON.toJson(programOptions.getArguments().asMap()));
        if (str != null) {
            put.put(ProgramOptionConstants.TWILL_RUN_ID, str);
        }
        publish(put);
    }

    @Override // co.cask.cdap.app.runtime.ProgramStateWriter
    public void running(ProgramRunId programRunId, @Nullable String str) {
        ImmutableMap.Builder<String, String> put = ImmutableMap.builder().put(ProgramOptionConstants.PROGRAM_RUN_ID, GSON.toJson(programRunId)).put(ProgramOptionConstants.LOGICAL_START_TIME, String.valueOf(System.currentTimeMillis())).put(ProgramOptionConstants.PROGRAM_STATUS, ProgramRunStatus.RUNNING.name());
        if (str != null) {
            put.put(ProgramOptionConstants.TWILL_RUN_ID, str);
        }
        publish(put);
    }

    @Override // co.cask.cdap.app.runtime.ProgramStateWriter
    public void completed(ProgramRunId programRunId) {
        stop(programRunId, ProgramRunStatus.COMPLETED, null);
    }

    @Override // co.cask.cdap.app.runtime.ProgramStateWriter
    public void killed(ProgramRunId programRunId) {
        stop(programRunId, ProgramRunStatus.KILLED, null);
    }

    @Override // co.cask.cdap.app.runtime.ProgramStateWriter
    public void error(ProgramRunId programRunId, Throwable th) {
        stop(programRunId, ProgramRunStatus.FAILED, th);
    }

    @Override // co.cask.cdap.app.runtime.ProgramStateWriter
    public void suspend(ProgramRunId programRunId) {
        publish(ImmutableMap.builder().put(ProgramOptionConstants.PROGRAM_RUN_ID, GSON.toJson(programRunId)).put(ProgramOptionConstants.PROGRAM_STATUS, ProgramRunStatus.SUSPENDED.name()));
    }

    @Override // co.cask.cdap.app.runtime.ProgramStateWriter
    public void resume(ProgramRunId programRunId) {
        publish(ImmutableMap.builder().put(ProgramOptionConstants.PROGRAM_RUN_ID, GSON.toJson(programRunId)).put(ProgramOptionConstants.PROGRAM_STATUS, ProgramRunStatus.RESUMING.name()));
    }

    private void stop(ProgramRunId programRunId, ProgramRunStatus programRunStatus, @Nullable Throwable th) {
        ImmutableMap.Builder<String, String> put = ImmutableMap.builder().put(ProgramOptionConstants.PROGRAM_RUN_ID, GSON.toJson(programRunId)).put(ProgramOptionConstants.END_TIME, String.valueOf(System.currentTimeMillis())).put(ProgramOptionConstants.PROGRAM_STATUS, programRunStatus.name());
        if (th != null) {
            put.put(ProgramOptionConstants.PROGRAM_ERROR, GSON.toJson(new BasicThrowable(th)));
        }
        publish(put);
    }

    private void publish(ImmutableMap.Builder<String, String> builder) {
        Notification notification = new Notification(Notification.Type.PROGRAM_STATUS, builder.build());
        int i = 0;
        long j = -1;
        boolean z = false;
        while (!z) {
            try {
                this.messagingService.publish(StoreRequestBuilder.of(this.topicId).addPayloads(new String[]{GSON.toJson(notification)}).build());
                LOG.trace("Published program status notification: {}", notification);
                z = true;
            } catch (IOException e) {
                Throwables.propagate(e);
            } catch (TopicNotFoundException | ServiceUnavailableException e2) {
                if (j < 0) {
                    j = System.currentTimeMillis();
                }
                i++;
                long nextRetry = this.retryStrategy.nextRetry(i, j);
                if (nextRetry < 0) {
                    LOG.error("Failed to publish messages to TMS and exceeded retry limit.", e2);
                    Throwables.propagate(e2);
                }
                LOG.debug("Failed to publish messages to TMS due to {}. Will be retried in {} ms.", e2.getMessage(), Long.valueOf(nextRetry));
                try {
                    TimeUnit.MILLISECONDS.sleep(nextRetry);
                } catch (InterruptedException e3) {
                    LOG.warn("Publishing message to TMS interrupted.");
                    Thread.currentThread().interrupt();
                    z = true;
                }
            }
        }
    }
}
