package co.cask.cdap.internal.app.program;

import co.cask.cdap.api.messaging.TopicNotFoundException;
import co.cask.cdap.app.runtime.Arguments;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.common.ServiceUnavailableException;
import co.cask.cdap.common.service.RetryStrategy;
import co.cask.cdap.internal.app.ApplicationSpecificationAdapter;
import co.cask.cdap.internal.app.runtime.codec.ArgumentsCodec;
import co.cask.cdap.internal.app.runtime.codec.ProgramOptionsCodec;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.client.StoreRequestBuilder;
import co.cask.cdap.proto.Notification;
import co.cask.cdap.proto.id.TopicId;
import com.google.common.base.Throwables;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/program/MessagingProgramStatePublisher.class */
public class MessagingProgramStatePublisher implements ProgramStatePublisher {
    private static final Logger LOG = LoggerFactory.getLogger(MessagingProgramStatePublisher.class);
    private static final Gson GSON = ApplicationSpecificationAdapter.addTypeAdapters(new GsonBuilder()).registerTypeAdapter(Arguments.class, new ArgumentsCodec()).registerTypeAdapter(ProgramOptions.class, new ProgramOptionsCodec()).create();
    private final MessagingService messagingService;
    private final TopicId topicId;
    private final RetryStrategy retryStrategy;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessagingProgramStatePublisher(MessagingService messagingService, TopicId topicId, RetryStrategy retryStrategy) {
        this.messagingService = messagingService;
        this.topicId = topicId;
        this.retryStrategy = retryStrategy;
    }

    @Override // co.cask.cdap.internal.app.program.ProgramStatePublisher
    public void publish(Notification.Type type, Map<String, String> map) {
        Notification notification = new Notification(type, map);
        int i = 0;
        long j = -1;
        boolean z = false;
        while (!z) {
            try {
                this.messagingService.publish(StoreRequestBuilder.of(this.topicId).addPayload(GSON.toJson(notification)).build());
                LOG.trace("Published program status notification: {}", notification);
                z = true;
            } catch (IOException e) {
                Throwables.propagate(e);
            } catch (TopicNotFoundException | ServiceUnavailableException e2) {
                if (j < 0) {
                    j = System.currentTimeMillis();
                }
                i++;
                long nextRetry = this.retryStrategy.nextRetry(i, j);
                if (nextRetry < 0) {
                    LOG.error("Failed to publish messages to TMS and exceeded retry limit.", e2);
                    Throwables.propagate(e2);
                }
                LOG.debug("Failed to publish messages to TMS due to {}. Will be retried in {} ms.", e2.getMessage(), Long.valueOf(nextRetry));
                try {
                    TimeUnit.MILLISECONDS.sleep(nextRetry);
                } catch (InterruptedException e3) {
                    LOG.warn("Publishing message to TMS interrupted.");
                    Thread.currentThread().interrupt();
                    z = true;
                }
            }
        }
    }
}
