package co.cask.cdap.messaging.subscriber;

import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.Transactionals;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.messaging.Message;
import co.cask.cdap.api.messaging.MessagingContext;
import co.cask.cdap.api.messaging.TopicNotFoundException;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.common.ServiceUnavailableException;
import co.cask.cdap.common.logging.LogSamplers;
import co.cask.cdap.common.logging.Loggers;
import co.cask.cdap.common.service.AbstractRetryableScheduledService;
import co.cask.cdap.common.service.RetryStrategy;
import co.cask.cdap.common.utils.ImmutablePair;
import co.cask.cdap.common.utils.TimeBoundIterator;
import co.cask.cdap.messaging.data.MessageId;
import co.cask.cdap.proto.id.TopicId;
import com.google.common.collect.AbstractIterator;
import com.google.common.util.concurrent.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.tephra.TransactionNotInProgressException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/messaging/subscriber/AbstractMessagingSubscriberService.class */
public abstract class AbstractMessagingSubscriberService<T> extends AbstractRetryableScheduledService {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractMessagingSubscriberService.class);
    private static final Logger SAMPLING_LOG = Loggers.sampling(LOG, LogSamplers.limitRate(10000));
    private final TopicId topicId;
    private final boolean transactionalFetch;
    private final int fetchSize;
    private final long emptyFetchDelayMillis;
    private final MetricsContext metricsContext;
    private final int txTimeoutSeconds;
    private final int maxTxTimeoutSeconds;
    private boolean messageIdInitialized;
    private String messageId;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/messaging/subscriber/AbstractMessagingSubscriberService$MessageTrackingIterator.class */
    public final class MessageTrackingIterator extends AbstractIterator<ImmutablePair<String, T>> {
        private final Iterator<Message> messages;
        private String lastMessageId;
        private int consumedCount = 0;
        private boolean shouldEnd = false;

        MessageTrackingIterator(Iterator<Message> it) {
            this.messages = it;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Multi-variable type inference failed */
        /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
        public ImmutablePair<String, T> m46computeNext() {
            if (this.shouldEnd) {
                return (ImmutablePair) endOfData();
            }
            while (this.messages.hasNext()) {
                Message next = this.messages.next();
                try {
                    Object decodeMessage = AbstractMessagingSubscriberService.this.decodeMessage(next);
                    if (AbstractMessagingSubscriberService.this.shouldRunInSeparateTx(decodeMessage)) {
                        if (this.consumedCount > 0) {
                            AbstractMessagingSubscriberService.LOG.debug("Ending message batch early to process {} in a separate tx", decodeMessage);
                            return (ImmutablePair) endOfData();
                        }
                        this.shouldEnd = true;
                    }
                    AbstractMessagingSubscriberService.LOG.trace("Processing message from topic {} with message id {}: {}", new Object[]{AbstractMessagingSubscriberService.this.topicId, next.getId(), decodeMessage});
                    this.consumedCount++;
                    this.lastMessageId = next.getId();
                    return new ImmutablePair<>(next.getId(), decodeMessage);
                } catch (Exception e) {
                    AbstractMessagingSubscriberService.LOG.warn("Failed to decode message with id {} and payload '{}'. Skipped.", new Object[]{next.getId(), next.getPayloadAsString(), e});
                    this.consumedCount++;
                    this.lastMessageId = next.getId();
                }
            }
            return (ImmutablePair) endOfData();
        }

        @Nullable
        String getLastMessageId() {
            return this.lastMessageId;
        }

        int getConsumedCount() {
            return this.consumedCount;
        }
    }

    protected AbstractMessagingSubscriberService(TopicId topicId, boolean z, int i, int i2, int i3, long j, RetryStrategy retryStrategy, MetricsContext metricsContext) {
        super(retryStrategy);
        this.topicId = topicId;
        this.transactionalFetch = z;
        this.fetchSize = i;
        this.txTimeoutSeconds = i2;
        this.maxTxTimeoutSeconds = i3;
        this.emptyFetchDelayMillis = j;
        this.metricsContext = metricsContext;
    }

    protected final TopicId getTopicId() {
        return this.topicId;
    }

    protected abstract MessagingContext getMessagingContext();

    protected abstract Transactional getTransactional();

    @Nullable
    protected abstract String loadMessageId(DatasetContext datasetContext) throws Exception;

    protected abstract void storeMessageId(DatasetContext datasetContext, String str) throws Exception;

    protected abstract T decodeMessage(Message message) throws Exception;

    protected boolean shouldRunInSeparateTx(T t) {
        return false;
    }

    protected abstract void processMessages(DatasetContext datasetContext, Iterator<ImmutablePair<String, T>> it) throws Exception;

    protected void postProcess() {
    }

    protected final long runTask() throws Exception {
        long fetchAndProcessMessages = fetchAndProcessMessages();
        try {
            postProcess();
        } catch (Exception e) {
            LOG.warn("Failed to perform post processing after processing messages.", e);
        }
        return fetchAndProcessMessages;
    }

    protected final boolean shouldRetry(Exception exc) {
        try {
            throw exc;
        } catch (Exception e) {
            SAMPLING_LOG.warn("Failed to get and process notifications. Will retry in next run", e);
            return true;
        } catch (ServiceUnavailableException e2) {
            SAMPLING_LOG.warn("Failed to contact service {}. Will retry in next run.", e2.getServiceName(), e2);
            return true;
        } catch (TopicNotFoundException e3) {
            SAMPLING_LOG.warn("Failed to fetch from TMS. Will retry in next run.", e3);
            return true;
        }
    }

    private long fetchAndProcessMessages() throws TopicNotFoundException, IOException {
        if (!this.messageIdInitialized) {
            this.messageId = (String) Transactionals.execute(getTransactional(), this::loadMessageId);
            this.messageIdInitialized = true;
        }
        long currentTimeMillis = System.currentTimeMillis();
        List<Message> fetchMessages = fetchMessages(this.messageId);
        this.metricsContext.gauge("tms.fetch.time.ms", System.currentTimeMillis() - currentTimeMillis);
        this.metricsContext.increment("tms.fetch.messages", fetchMessages.size());
        if (fetchMessages.isEmpty() || state() != Service.State.RUNNING) {
            return this.emptyFetchDelayMillis;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        int i = this.txTimeoutSeconds;
        while (true) {
            try {
                long j = 900 * i;
                MessageTrackingIterator messageTrackingIterator = (MessageTrackingIterator) Transactionals.execute(getTransactional(), i, datasetContext -> {
                    MessageTrackingIterator messageTrackingIterator2 = new MessageTrackingIterator(new TimeBoundIterator(fetchMessages.iterator(), j));
                    processMessages(datasetContext, messageTrackingIterator2);
                    String lastMessageId = messageTrackingIterator2.getLastMessageId();
                    if (lastMessageId != null) {
                        storeMessageId(datasetContext, lastMessageId);
                    }
                    return messageTrackingIterator2;
                });
                this.messageId = messageTrackingIterator.getLastMessageId() == null ? this.messageId : messageTrackingIterator.getLastMessageId();
                long currentTimeMillis3 = System.currentTimeMillis();
                this.metricsContext.gauge("process.duration.ms", currentTimeMillis3 - currentTimeMillis2);
                this.metricsContext.increment("process.notifications", messageTrackingIterator.getConsumedCount());
                if (this.messageId == null) {
                    return 0L;
                }
                this.metricsContext.gauge("process.delay.ms", currentTimeMillis3 - getMessagePublishTime(this.messageId));
                return 0L;
            } catch (Exception e) {
                if (!(e.getCause() instanceof TransactionNotInProgressException)) {
                    throw e;
                }
                if (i >= this.maxTxTimeoutSeconds) {
                    throw e;
                }
                i = Math.min(this.maxTxTimeoutSeconds, 2 * i);
                LOG.warn("Timed out processing system message. Trying again with a larger timeout of {} seconds.", Integer.valueOf(i));
            }
        }
    }

    private List<Message> fetchMessages(@Nullable String str) throws TopicNotFoundException, IOException {
        return !this.transactionalFetch ? doFetchMessages(str) : (List) Transactionals.execute(getTransactional(), datasetContext -> {
            return doFetchMessages(str);
        }, TopicNotFoundException.class, IOException.class);
    }

    private List<Message> doFetchMessages(@Nullable String str) throws TopicNotFoundException, IOException {
        ArrayList arrayList = new ArrayList();
        LOG.trace("Fetching from topic '{}' with messageId '{}'", str);
        CloseableIterator fetch = getMessagingContext().getMessageFetcher().fetch(this.topicId.getNamespace(), this.topicId.getTopic(), this.fetchSize, str);
        Throwable th = null;
        while (fetch.hasNext() && state() == Service.State.RUNNING) {
            try {
                try {
                    arrayList.add(fetch.next());
                } finally {
                }
            } catch (Throwable th2) {
                if (fetch != null) {
                    if (th != null) {
                        try {
                            fetch.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        fetch.close();
                    }
                }
                throw th2;
            }
        }
        if (fetch != null) {
            if (0 != 0) {
                try {
                    fetch.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                fetch.close();
            }
        }
        return arrayList;
    }

    private long getMessagePublishTime(String str) {
        return new MessageId(Bytes.fromHexString(str)).getPublishTimestamp();
    }
}
