package co.cask.cdap.messaging.service;

import co.cask.cdap.api.metrics.MetricsCollector;
import co.cask.cdap.api.metrics.NoopMetricsContext;
import co.cask.cdap.messaging.RollbackDetail;
import co.cask.cdap.messaging.StoreRequest;
import co.cask.cdap.messaging.TopicMetadata;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
/* loaded from: input_file:co/cask/cdap/messaging/service/ConcurrentMessageWriter.class */
final class ConcurrentMessageWriter implements Closeable {
    private final StoreRequestWriter<?> messagesWriter;
    private final MetricsCollector metricsCollector;
    private final PendingStoreQueue pendingStoreQueue;
    private final AtomicBoolean writerFlag;
    private final AtomicBoolean closed;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/messaging/service/ConcurrentMessageWriter$PendingStoreQueue.class */
    public static final class PendingStoreQueue {
        private final MetricsCollector metricsCollector;
        private final Queue<PendingStoreRequest> writeQueue;
        private final List<PendingStoreRequest> inflightRequests;

        private PendingStoreQueue(MetricsCollector metricsCollector) {
            this.metricsCollector = metricsCollector;
            this.writeQueue = new ConcurrentLinkedQueue();
            this.inflightRequests = new ArrayList(100);
        }

        void enqueue(PendingStoreRequest pendingStoreRequest) {
            this.writeQueue.add(pendingStoreRequest);
        }

        void persist(StoreRequestWriter<?> storeRequestWriter) {
            this.inflightRequests.clear();
            PendingStoreRequest poll = this.writeQueue.poll();
            while (true) {
                PendingStoreRequest pendingStoreRequest = poll;
                if (pendingStoreRequest == null) {
                    this.metricsCollector.gauge("persist.queue.size", this.inflightRequests.size());
                    try {
                        storeRequestWriter.write(this.inflightRequests.iterator());
                        completeAll(null);
                        return;
                    } catch (Throwable th) {
                        completeAll(th);
                        return;
                    }
                }
                this.inflightRequests.add(pendingStoreRequest);
                poll = this.writeQueue.poll();
            }
        }

        void completeAll(@Nullable Throwable th) {
            Iterator<PendingStoreRequest> it = this.inflightRequests.iterator();
            while (it.hasNext()) {
                it.next().completed(th);
                it.remove();
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/messaging/service/ConcurrentMessageWriter$SimpleRollbackDetail.class */
    private static final class SimpleRollbackDetail implements RollbackDetail {
        private final long transactionWritePointer;
        private final long startTimestamp;
        private final int startSequenceId;
        private final long endTimestamp;
        private final int endSequenceId;

        SimpleRollbackDetail(long j, long j2, int i, long j3, int i2) {
            this.transactionWritePointer = j;
            this.startTimestamp = j2;
            this.startSequenceId = i;
            this.endTimestamp = j3;
            this.endSequenceId = i2;
        }

        @Override // co.cask.cdap.messaging.RollbackDetail
        public long getTransactionWritePointer() {
            return this.transactionWritePointer;
        }

        @Override // co.cask.cdap.messaging.RollbackDetail
        public long getStartTimestamp() {
            return this.startTimestamp;
        }

        @Override // co.cask.cdap.messaging.RollbackDetail
        public int getStartSequenceId() {
            return this.startSequenceId;
        }

        @Override // co.cask.cdap.messaging.RollbackDetail
        public long getEndTimestamp() {
            return this.endTimestamp;
        }

        @Override // co.cask.cdap.messaging.RollbackDetail
        public int getEndSequenceId() {
            return this.endSequenceId;
        }
    }

    @VisibleForTesting
    ConcurrentMessageWriter(StoreRequestWriter<?> storeRequestWriter) {
        this(storeRequestWriter, new NoopMetricsContext());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConcurrentMessageWriter(StoreRequestWriter<?> storeRequestWriter, MetricsCollector metricsCollector) {
        this.messagesWriter = storeRequestWriter;
        this.metricsCollector = metricsCollector;
        this.pendingStoreQueue = new PendingStoreQueue(metricsCollector);
        this.writerFlag = new AtomicBoolean();
        this.closed = new AtomicBoolean();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public RollbackDetail persist(StoreRequest storeRequest, TopicMetadata topicMetadata) throws IOException {
        if (this.closed.get()) {
            throw new IOException("Message writer is already closed");
        }
        PendingStoreRequest pendingStoreRequest = new PendingStoreRequest(storeRequest, topicMetadata);
        this.pendingStoreQueue.enqueue(pendingStoreRequest);
        this.metricsCollector.increment("persist.requested", 1L);
        while (!pendingStoreRequest.isCompleted()) {
            if (!tryWrite()) {
                Thread.yield();
            }
        }
        if (!pendingStoreRequest.isSuccess()) {
            this.metricsCollector.increment("persist.failure", 1L);
            Throwables.propagateIfInstanceOf(pendingStoreRequest.getFailureCause(), IOException.class);
            throw new IOException("Unable to write message to " + storeRequest.getTopicId(), pendingStoreRequest.getFailureCause());
        }
        this.metricsCollector.increment("persist.success", 1L);
        if (pendingStoreRequest.isTransactional()) {
            return new SimpleRollbackDetail(pendingStoreRequest.getTransactionWritePointer(), pendingStoreRequest.getStartTimestamp(), pendingStoreRequest.getStartSequenceId(), pendingStoreRequest.getEndTimestamp(), pendingStoreRequest.getEndSequenceId());
        }
        return null;
    }

    private boolean tryWrite() {
        if (!this.writerFlag.compareAndSet(false, true)) {
            return false;
        }
        try {
            this.pendingStoreQueue.persist(this.messagesWriter);
            this.writerFlag.set(false);
            return true;
        } catch (Throwable th) {
            this.writerFlag.set(false);
            throw th;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.closed.compareAndSet(false, true)) {
            while (!tryWrite()) {
                Thread.yield();
            }
            this.messagesWriter.close();
        }
    }
}
