package co.cask.cdap.internal.app.runtime.workflow;

import co.cask.cdap.api.workflow.WorkflowToken;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.service.Retries;
import co.cask.cdap.common.service.RetryStrategies;
import co.cask.cdap.common.service.RetryStrategy;
import co.cask.cdap.data2.metadata.writer.MetadataMessage;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.StoreRequest;
import co.cask.cdap.messaging.client.StoreRequestBuilder;
import co.cask.cdap.proto.WorkflowNodeStateDetail;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.id.TopicId;
import com.google.gson.Gson;
import com.google.inject.Inject;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/workflow/MessagingWorkflowStateWriter.class */
public class MessagingWorkflowStateWriter implements WorkflowStateWriter {
    private static final Gson GSON = new Gson();
    private final TopicId topic;
    private final MessagingService messagingService;
    private final RetryStrategy retryStrategy;

    @Inject
    MessagingWorkflowStateWriter(CConfiguration cConfiguration, MessagingService messagingService) {
        this.topic = NamespaceId.SYSTEM.topic(cConfiguration.get("metadata.messaging.topic"));
        this.messagingService = messagingService;
        this.retryStrategy = RetryStrategies.fromConfiguration(cConfiguration, "system.metadata.");
    }

    @Override // co.cask.cdap.internal.app.runtime.workflow.WorkflowStateWriter
    public void setWorkflowToken(ProgramRunId programRunId, WorkflowToken workflowToken) {
        StoreRequest build = StoreRequestBuilder.of(this.topic).addPayload(GSON.toJson(new MetadataMessage(MetadataMessage.Type.WORKFLOW_TOKEN, programRunId, GSON.toJsonTree(workflowToken)))).build();
        try {
            Retries.callWithRetries(() -> {
                return this.messagingService.publish(build);
            }, this.retryStrategy, Retries.ALWAYS_TRUE);
        } catch (Exception e) {
            throw new RuntimeException("Failed to publish workflow token for workflow run " + programRunId, e);
        }
    }

    @Override // co.cask.cdap.internal.app.runtime.workflow.WorkflowStateWriter
    public void addWorkflowNodeState(ProgramRunId programRunId, WorkflowNodeStateDetail workflowNodeStateDetail) {
        StoreRequest build = StoreRequestBuilder.of(this.topic).addPayload(GSON.toJson(new MetadataMessage(MetadataMessage.Type.WORKFLOW_STATE, programRunId, GSON.toJsonTree(workflowNodeStateDetail)))).build();
        try {
            Retries.callWithRetries(() -> {
                return this.messagingService.publish(build);
            }, this.retryStrategy, Retries.ALWAYS_TRUE);
        } catch (Exception e) {
            throw new RuntimeException("Failed to publish workflow node state for workflow run " + programRunId + "of node " + workflowNodeStateDetail.getNodeId() + " with state " + workflowNodeStateDetail.getNodeStatus(), e);
        }
    }
}
