package co.cask.cdap.messaging.service;

import co.cask.cdap.common.utils.TimeProvider;
import co.cask.cdap.messaging.TopicMetadata;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.AbstractIterator;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

/* JADX INFO: Access modifiers changed from: package-private */
@NotThreadSafe
/* loaded from: input_file:co/cask/cdap/messaging/service/StoreRequestWriter.class */
public abstract class StoreRequestWriter<T> implements Closeable {

    @VisibleForTesting
    static final int SEQUENCE_ID_LIMIT = 65536;
    private final TimeProvider timeProvider;
    private long writeTimestamp;
    private long lastWriteTimestamp;
    private int seqId;
    private final StoreRequestWriter<T>.PayloadTransformIterator payloadTransformIterator;

    /* loaded from: input_file:co/cask/cdap/messaging/service/StoreRequestWriter$PayloadTransformIterator.class */
    private final class PayloadTransformIterator implements Iterator<T> {
        private final boolean generateNullPayloadEntry;
        private PendingStoreRequest storeRequest;
        private boolean computedFirst;
        private T nextEntry;
        private boolean completed;

        private PayloadTransformIterator(boolean z) {
            this.completed = true;
            this.generateNullPayloadEntry = z;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if (this.completed) {
                return false;
            }
            if (this.nextEntry != null) {
                return true;
            }
            if (this.storeRequest.hasNext() || (this.generateNullPayloadEntry && !this.computedFirst)) {
                this.nextEntry = (T) StoreRequestWriter.this.getEntry(this.storeRequest.getTopicMetadata(), this.storeRequest.isTransactional(), this.storeRequest.getTransactionWritePointer(), StoreRequestWriter.this.writeTimestamp, (short) StoreRequestWriter.this.seqId, this.storeRequest.hasNext() ? (byte[]) this.storeRequest.next() : null);
            }
            this.computedFirst = true;
            this.completed = this.nextEntry == null;
            return !this.completed;
        }

        @Override // java.util.Iterator
        public T next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            T t = this.nextEntry;
            this.storeRequest.setEndTimestamp(StoreRequestWriter.this.writeTimestamp);
            this.storeRequest.setEndSequenceId(StoreRequestWriter.this.seqId);
            StoreRequestWriter.this.incrementSequenceId();
            this.nextEntry = null;
            return t;
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new UnsupportedOperationException("Delete not supported");
        }

        /* JADX INFO: Access modifiers changed from: private */
        public StoreRequestWriter<T>.PayloadTransformIterator reset(PendingStoreRequest pendingStoreRequest) {
            this.storeRequest = pendingStoreRequest;
            this.storeRequest.setStartTimestamp(StoreRequestWriter.this.writeTimestamp);
            this.storeRequest.setStartSequenceId(StoreRequestWriter.this.seqId);
            this.nextEntry = null;
            this.computedFirst = false;
            this.completed = false;
            return this;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StoreRequestWriter(TimeProvider timeProvider, boolean z) {
        this.timeProvider = timeProvider;
        this.payloadTransformIterator = new PayloadTransformIterator(z);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void write(final Iterator<? extends PendingStoreRequest> it) throws IOException {
        updateTimeSequence();
        doWrite(new AbstractIterator<T>() { // from class: co.cask.cdap.messaging.service.StoreRequestWriter.1
            private PendingStoreRequest currentRequest;

            protected T computeNext() {
                while (!StoreRequestWriter.this.payloadTransformIterator.hasNext()) {
                    if (!it.hasNext()) {
                        return (T) endOfData();
                    }
                    this.currentRequest = (PendingStoreRequest) it.next();
                    StoreRequestWriter.this.payloadTransformIterator.reset(this.currentRequest);
                }
                return StoreRequestWriter.this.payloadTransformIterator.hasNext() ? (T) StoreRequestWriter.this.payloadTransformIterator.next() : (T) endOfData();
            }
        });
    }

    abstract T getEntry(TopicMetadata topicMetadata, boolean z, long j, long j2, short s, @Nullable byte[] bArr);

    abstract void doWrite(Iterator<T> it) throws IOException;

    /* JADX INFO: Access modifiers changed from: private */
    public void incrementSequenceId() {
        this.seqId++;
        if (this.seqId >= SEQUENCE_ID_LIMIT) {
            updateTimeSequence();
        }
    }

    private void updateTimeSequence() {
        this.writeTimestamp = this.timeProvider.currentTimeMillis();
        if (this.writeTimestamp == this.lastWriteTimestamp && this.seqId >= SEQUENCE_ID_LIMIT) {
            Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.MILLISECONDS);
            this.writeTimestamp = this.timeProvider.currentTimeMillis();
        }
        if (this.writeTimestamp != this.lastWriteTimestamp) {
            this.lastWriteTimestamp = this.writeTimestamp;
            this.seqId = 0;
        }
    }
}
