package co.cask.cdap.metrics.collect;

import co.cask.cdap.api.messaging.TopicNotFoundException;
import co.cask.cdap.api.metrics.MetricValues;
import co.cask.cdap.common.ServiceUnavailableException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.io.BinaryEncoder;
import co.cask.cdap.common.io.DatumWriter;
import co.cask.cdap.common.io.Encoder;
import co.cask.cdap.common.service.RetryStrategies;
import co.cask.cdap.common.service.RetryStrategy;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.client.StoreRequestBuilder;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.TopicId;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:co/cask/cdap/metrics/collect/MessagingMetricsCollectionService.class */
public class MessagingMetricsCollectionService extends AggregatedMetricsCollectionService {
    private static final Logger LOG = LoggerFactory.getLogger(MessagingMetricsCollectionService.class);
    private final MessagingService messagingService;
    private final DatumWriter<MetricValues> recordWriter;
    private final ByteArrayOutputStream encoderOutputStream;
    private final Encoder encoder;
    private final Map<Integer, TopicPayload> topicPayloads;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/metrics/collect/MessagingMetricsCollectionService$TopicPayload.class */
    public final class TopicPayload {
        private final TopicId topicId;
        private final List<byte[]> payloads;
        private final RetryStrategy retryStrategy;

        private TopicPayload(TopicId topicId, RetryStrategy retryStrategy) {
            this.topicId = topicId;
            this.retryStrategy = retryStrategy;
            this.payloads = new ArrayList();
        }

        void addPayload(byte[] bArr) {
            this.payloads.add(bArr);
        }

        void publish(MessagingService messagingService) throws IOException {
            if (this.payloads.isEmpty()) {
                return;
            }
            int i = 0;
            long j = -1;
            boolean z = false;
            while (!z) {
                try {
                    messagingService.publish(StoreRequestBuilder.of(this.topicId).addPayloads(this.payloads.iterator()).build());
                    this.payloads.clear();
                    z = true;
                } catch (TopicNotFoundException | ServiceUnavailableException e) {
                    if (j < 0) {
                        j = System.currentTimeMillis();
                    }
                    i++;
                    long nextRetry = getRetryStrategy().nextRetry(i, j);
                    if (nextRetry < 0) {
                        throw new IOException("Failed to publish messages to TMS and exceeded retry limit.", e);
                    }
                    MessagingMetricsCollectionService.LOG.debug("Failed to publish messages to TMS due to {}. Will be retried in {} ms.", e.getMessage(), Long.valueOf(nextRetry));
                    try {
                        TimeUnit.MILLISECONDS.sleep(nextRetry);
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                        z = true;
                    }
                }
            }
        }

        private RetryStrategy getRetryStrategy() {
            return MessagingMetricsCollectionService.this.isRunning() ? this.retryStrategy : RetryStrategies.timeLimit(5L, TimeUnit.SECONDS, RetryStrategies.fixDelay(200L, TimeUnit.MILLISECONDS));
        }
    }

    @Inject
    MessagingMetricsCollectionService(@Named("metrics.topic.prefix") String str, @Named("metrics.messaging.topic.num") int i, CConfiguration cConfiguration, MessagingService messagingService, DatumWriter<MetricValues> datumWriter) {
        Preconditions.checkArgument(i > 0, "Constants.Metrics.MESSAGING_TOPIC_NUM must be a positive integer");
        this.messagingService = messagingService;
        this.recordWriter = datumWriter;
        this.encoderOutputStream = new ByteArrayOutputStream(1024);
        this.encoder = new BinaryEncoder(this.encoderOutputStream);
        RetryStrategy fromConfiguration = RetryStrategies.fromConfiguration(cConfiguration, "system.metrics.");
        this.topicPayloads = new LinkedHashMap(i);
        for (int i2 = 0; i2 < i; i2++) {
            this.topicPayloads.put(Integer.valueOf(i2), new TopicPayload(NamespaceId.SYSTEM.topic(str + i2), fromConfiguration));
        }
    }

    @Override // co.cask.cdap.metrics.collect.AggregatedMetricsCollectionService
    protected void publish(Iterator<MetricValues> it) throws Exception {
        int size = this.topicPayloads.size();
        while (it.hasNext()) {
            this.encoderOutputStream.reset();
            MetricValues next = it.next();
            this.recordWriter.encode(next, this.encoder);
            this.topicPayloads.get(Integer.valueOf(Math.abs(next.getTags().hashCode() % size))).addPayload(this.encoderOutputStream.toByteArray());
        }
        publishMetric(this.topicPayloads.values());
    }

    private void publishMetric(Iterable<TopicPayload> iterable) throws IOException {
        Iterator<TopicPayload> it = iterable.iterator();
        while (it.hasNext()) {
            it.next().publish(this.messagingService);
        }
    }
}
