package co.cask.cdap.messaging.service;

import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.messaging.MessageFetcher;
import co.cask.cdap.messaging.TopicMetadata;
import co.cask.cdap.messaging.data.MessageId;
import co.cask.cdap.messaging.data.RawMessage;
import co.cask.cdap.messaging.store.MessageTable;
import co.cask.cdap.messaging.store.PayloadTable;
import co.cask.cdap.proto.id.TopicId;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/cask/cdap/messaging/service/CoreMessageFetcher.class */
public final class CoreMessageFetcher extends MessageFetcher {
    private static final Logger LOG = LoggerFactory.getLogger(CoreMessageFetcher.class);
    private final TopicMetadata topicMetadata;
    private final TableProvider<MessageTable> messageTableProvider;
    private final TableProvider<PayloadTable> payloadTableProvider;

    /* loaded from: input_file:co/cask/cdap/messaging/service/CoreMessageFetcher$MessageCloseableIterator.class */
    private final class MessageCloseableIterator implements CloseableIterator<RawMessage> {
        private final CloseableIterator<MessageTable.Entry> messageIterator;
        private final TopicId topicId;
        private final MessageTable messageTable;
        private RawMessage nextMessage;
        private MessageTable.Entry messageEntry;
        private CloseableIterator<PayloadTable.Entry> payloadIterator;
        private MessageId startOffset;
        private boolean inclusive;
        private int messageLimit;
        private PayloadTable payloadTable;

        MessageCloseableIterator(MessageTable messageTable) throws IOException {
            CloseableIterator<MessageTable.Entry> fetch;
            this.topicId = CoreMessageFetcher.this.topicMetadata.getTopicId();
            this.messageTable = messageTable;
            this.inclusive = CoreMessageFetcher.this.isIncludeStart();
            this.messageLimit = CoreMessageFetcher.this.getLimit();
            long ttl = CoreMessageFetcher.this.topicMetadata.getTTL();
            this.startOffset = CoreMessageFetcher.this.getStartOffset() == null ? null : new MessageId(CoreMessageFetcher.this.getStartOffset());
            Long startTime = CoreMessageFetcher.this.getStartTime();
            long currentTimeMillis = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttl);
            if (this.startOffset == null || this.startOffset.getPublishTimestamp() < currentTimeMillis) {
                fetch = messageTable.fetch(CoreMessageFetcher.this.topicMetadata, Math.max(currentTimeMillis, startTime == null ? currentTimeMillis : startTime.longValue()), this.messageLimit, CoreMessageFetcher.this.getTransaction());
            } else {
                fetch = this.startOffset.getPayloadWriteTimestamp() != 0 ? messageTable.fetch(CoreMessageFetcher.this.topicMetadata, CoreMessageFetcher.this.createMessageTableMessageId(this.startOffset), true, this.messageLimit, CoreMessageFetcher.this.getTransaction()) : messageTable.fetch(CoreMessageFetcher.this.topicMetadata, this.startOffset, CoreMessageFetcher.this.isIncludeStart(), this.messageLimit, CoreMessageFetcher.this.getTransaction());
            }
            this.messageIterator = fetch;
        }

        public boolean hasNext() {
            if (this.messageLimit <= 0) {
                return false;
            }
            while (true) {
                if (this.nextMessage == null) {
                    if (this.payloadIterator != null && this.payloadIterator.hasNext()) {
                        PayloadTable.Entry entry = (PayloadTable.Entry) this.payloadIterator.next();
                        this.nextMessage = new RawMessage(CoreMessageFetcher.this.createMessageId(this.messageEntry, entry), entry.getPayload());
                        break;
                    }
                    if (!this.messageIterator.hasNext()) {
                        break;
                    }
                    this.messageEntry = (MessageTable.Entry) this.messageIterator.next();
                    if (this.messageEntry.isPayloadReference()) {
                        try {
                            if (this.payloadTable == null) {
                                this.payloadTable = (PayloadTable) CoreMessageFetcher.this.payloadTableProvider.get();
                            }
                            CoreMessageFetcher.this.closeQuietly(this.payloadIterator);
                            this.payloadIterator = this.payloadTable.fetch(CoreMessageFetcher.this.topicMetadata, this.messageEntry.getTransactionWritePointer(), this.startOffset == null ? new MessageId(CoreMessageFetcher.this.createMessageId(this.messageEntry, null)) : new MessageId(CoreMessageFetcher.this.createMessageId(this.messageEntry, this.startOffset.getPayloadWriteTimestamp(), this.startOffset.getPayloadSequenceId())), this.startOffset == null || this.inclusive, this.messageLimit);
                            this.startOffset = null;
                        } catch (IOException e) {
                            throw Throwables.propagate(e);
                        }
                    } else {
                        this.nextMessage = new RawMessage(CoreMessageFetcher.this.createMessageId(this.messageEntry, null), this.messageEntry.getPayload());
                    }
                } else {
                    break;
                }
            }
            this.inclusive = true;
            return this.nextMessage != null;
        }

        /* renamed from: next, reason: merged with bridge method [inline-methods] */
        public RawMessage m18next() {
            if (!hasNext()) {
                throw new NoSuchElementException("No more message from " + this.topicId);
            }
            RawMessage rawMessage = this.nextMessage;
            this.nextMessage = null;
            this.messageLimit--;
            return rawMessage;
        }

        public void remove() {
            throw new UnsupportedOperationException("Remove is not supported");
        }

        public void close() {
            CoreMessageFetcher.this.closeQuietly(this.payloadIterator);
            CoreMessageFetcher.this.closeQuietly(this.messageIterator);
            CoreMessageFetcher.this.closeQuietly(this.payloadTable);
            CoreMessageFetcher.this.closeQuietly(this.messageTable);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CoreMessageFetcher(TopicMetadata topicMetadata, TableProvider<MessageTable> tableProvider, TableProvider<PayloadTable> tableProvider2) {
        this.topicMetadata = topicMetadata;
        this.messageTableProvider = tableProvider;
        this.payloadTableProvider = tableProvider2;
    }

    @Override // co.cask.cdap.messaging.MessageFetcher
    public CloseableIterator<RawMessage> fetch() throws IOException {
        MessageTable messageTable = this.messageTableProvider.get();
        try {
            return new MessageCloseableIterator(messageTable);
        } catch (Throwable th) {
            closeQuietly(messageTable);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MessageId createMessageTableMessageId(MessageId messageId) {
        byte[] bArr = new byte[20];
        MessageId.putRawId(messageId.getPublishTimestamp(), messageId.getSequenceId(), 0L, (short) 0, bArr, 0);
        return new MessageId(bArr);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public byte[] createMessageId(MessageTable.Entry entry, @Nullable PayloadTable.Entry entry2) {
        return createMessageId(entry, entry2 == null ? 0L : entry2.getPayloadWriteTimestamp(), entry2 == null ? (short) 0 : entry2.getPayloadSequenceId());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public byte[] createMessageId(MessageTable.Entry entry, long j, short s) {
        byte[] bArr = new byte[20];
        MessageId.putRawId(entry.getPublishTimestamp(), entry.getSequenceId(), j, s, bArr, 0);
        return bArr;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeQuietly(@Nullable AutoCloseable autoCloseable) {
        if (autoCloseable == null) {
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th) {
            LOG.warn("Exception raised when closing Closeable {}", autoCloseable, th);
        }
    }
}
