package com.twitter.distributedlog.client;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.LogRecordSet;
import com.twitter.distributedlog.LogRecordSetBuffer;
import com.twitter.distributedlog.client.speculative.DefaultSpeculativeRequestExecutionPolicy;
import com.twitter.distributedlog.client.speculative.SpeculativeRequestExecutionPolicy;
import com.twitter.distributedlog.client.speculative.SpeculativeRequestExecutor;
import com.twitter.distributedlog.exceptions.LogRecordTooLongException;
import com.twitter.distributedlog.exceptions.WriteException;
import com.twitter.distributedlog.io.CompressionCodec;
import com.twitter.distributedlog.service.DistributedLogClient;
import com.twitter.finagle.IndividualRequestTimeoutException;
import com.twitter.util.Duration;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/* loaded from: input_file:com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.class */
public class DistributedLogMultiStreamWriter implements Runnable {
    private final int numStreams;
    private final List<String> streams;
    private final DistributedLogClient client;
    private final int bufferSize;
    private final long requestTimeoutMs;
    private final SpeculativeRequestExecutionPolicy speculativePolicy;
    private final Ticker clockTicker;
    private final CompressionCodec.Type codec;
    private final ScheduledExecutorService scheduler;
    private final boolean ownScheduler;
    private final AtomicInteger nextStreamId;
    private LogRecordSet.Writer recordSetWriter;

    /* loaded from: input_file:com/twitter/distributedlog/client/DistributedLogMultiStreamWriter$Builder.class */
    public static class Builder {
        private DistributedLogClient _client;
        private List<String> _streams;
        private int _bufferSize;
        private int _flushIntervalMs;
        private CompressionCodec.Type _codec;
        private ScheduledExecutorService _executorService;
        private long _requestTimeoutMs;
        private int _firstSpeculativeTimeoutMs;
        private int _maxSpeculativeTimeoutMs;
        private float _speculativeBackoffMultiplier;
        private Ticker _ticker;

        private Builder() {
            this._client = null;
            this._streams = null;
            this._bufferSize = 16384;
            this._flushIntervalMs = 10;
            this._codec = CompressionCodec.Type.NONE;
            this._executorService = null;
            this._requestTimeoutMs = 500L;
            this._firstSpeculativeTimeoutMs = 50;
            this._maxSpeculativeTimeoutMs = 200;
            this._speculativeBackoffMultiplier = 2.0f;
            this._ticker = Ticker.systemTicker();
        }

        public Builder client(DistributedLogClient distributedLogClient) {
            this._client = distributedLogClient;
            return this;
        }

        public Builder streams(List<String> list) {
            this._streams = list;
            return this;
        }

        public Builder bufferSize(int i) {
            this._bufferSize = i;
            return this;
        }

        public Builder flushIntervalMs(int i) {
            this._flushIntervalMs = i;
            return this;
        }

        public Builder compressionCodec(CompressionCodec.Type type) {
            this._codec = type;
            return this;
        }

        public Builder scheduler(ScheduledExecutorService scheduledExecutorService) {
            this._executorService = scheduledExecutorService;
            return this;
        }

        public Builder requestTimeoutMs(long j) {
            this._requestTimeoutMs = j;
            return this;
        }

        public Builder firstSpeculativeTimeoutMs(int i) {
            this._firstSpeculativeTimeoutMs = i;
            return this;
        }

        public Builder maxSpeculativeTimeoutMs(int i) {
            this._maxSpeculativeTimeoutMs = i;
            return this;
        }

        public Builder speculativeBackoffMultiplier(float f) {
            this._speculativeBackoffMultiplier = f;
            return this;
        }

        public Builder clockTicker(Ticker ticker) {
            this._ticker = ticker;
            return this;
        }

        public DistributedLogMultiStreamWriter build() {
            Preconditions.checkArgument((null == this._streams || this._streams.isEmpty()) ? false : true, "No streams provided");
            Preconditions.checkNotNull(this._client, "No distributedlog client provided");
            Preconditions.checkNotNull(this._codec, "No compression codec provided");
            Preconditions.checkArgument(this._firstSpeculativeTimeoutMs > 0 && this._firstSpeculativeTimeoutMs <= this._maxSpeculativeTimeoutMs && this._speculativeBackoffMultiplier > 0.0f && ((long) this._maxSpeculativeTimeoutMs) < this._requestTimeoutMs, "Invalid speculative timeout settings");
            return new DistributedLogMultiStreamWriter(this._streams, this._client, Math.min(this._bufferSize, 1044480), this._flushIntervalMs, this._requestTimeoutMs, this._firstSpeculativeTimeoutMs, this._maxSpeculativeTimeoutMs, this._speculativeBackoffMultiplier, this._codec, this._ticker, this._executorService);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/twitter/distributedlog/client/DistributedLogMultiStreamWriter$PendingWriteRequest.class */
    public class PendingWriteRequest implements FutureEventListener<DLSN>, SpeculativeRequestExecutor {
        private final LogRecordSetBuffer recordSet;
        private final Stopwatch stopwatch;
        private int nextStream;
        private AtomicBoolean complete = new AtomicBoolean(false);
        private int numTriedStreams = 0;

        PendingWriteRequest(LogRecordSetBuffer logRecordSetBuffer) {
            this.stopwatch = Stopwatch.createStarted(DistributedLogMultiStreamWriter.this.clockTicker);
            this.recordSet = logRecordSetBuffer;
            this.nextStream = Math.abs(DistributedLogMultiStreamWriter.this.nextStreamId.incrementAndGet()) % DistributedLogMultiStreamWriter.this.numStreams;
        }

        synchronized String sendNextWrite() {
            long elapsed = this.stopwatch.elapsed(TimeUnit.MILLISECONDS);
            if (elapsed > DistributedLogMultiStreamWriter.this.requestTimeoutMs || this.numTriedStreams >= DistributedLogMultiStreamWriter.this.numStreams) {
                fail(new IndividualRequestTimeoutException(Duration.fromMilliseconds(elapsed)));
                return null;
            }
            try {
                String sendWriteToStream = sendWriteToStream(this.nextStream);
                this.nextStream = (this.nextStream + 1) % DistributedLogMultiStreamWriter.this.numStreams;
                this.numTriedStreams++;
                return sendWriteToStream;
            } catch (Throwable th) {
                this.nextStream = (this.nextStream + 1) % DistributedLogMultiStreamWriter.this.numStreams;
                this.numTriedStreams++;
                throw th;
            }
        }

        synchronized String sendWriteToStream(int i) {
            String stream = DistributedLogMultiStreamWriter.this.getStream(i);
            DistributedLogMultiStreamWriter.this.client.writeRecordSet(stream, this.recordSet).addEventListener(this);
            return stream;
        }

        public void onSuccess(DLSN dlsn) {
            if (this.complete.compareAndSet(false, true)) {
                this.recordSet.completeTransmit(dlsn.getLogSegmentSequenceNo(), dlsn.getEntryId(), dlsn.getSlotId());
            }
        }

        public void onFailure(Throwable th) {
            sendNextWrite();
        }

        private void fail(Throwable th) {
            if (this.complete.compareAndSet(false, true)) {
                this.recordSet.abortTransmit(th);
            }
        }

        @Override // com.twitter.distributedlog.client.speculative.SpeculativeRequestExecutor
        public Future<Boolean> issueSpeculativeRequest() {
            return Future.value(Boolean.valueOf((this.complete.get() || null == sendNextWrite()) ? false : true));
        }
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    private DistributedLogMultiStreamWriter(List<String> list, DistributedLogClient distributedLogClient, int i, int i2, long j, int i3, int i4, float f, CompressionCodec.Type type, Ticker ticker, ScheduledExecutorService scheduledExecutorService) {
        this.streams = Lists.newArrayList(list);
        this.numStreams = this.streams.size();
        this.client = distributedLogClient;
        this.bufferSize = i;
        this.requestTimeoutMs = j;
        this.codec = type;
        this.clockTicker = ticker;
        if (null == scheduledExecutorService) {
            this.scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MultiStreamWriterFlushThread-%d").build());
            this.ownScheduler = true;
        } else {
            this.scheduler = scheduledExecutorService;
            this.ownScheduler = false;
        }
        this.speculativePolicy = new DefaultSpeculativeRequestExecutionPolicy(i3, i4, f);
        Collections.shuffle(this.streams);
        this.nextStreamId = new AtomicInteger(0);
        this.recordSetWriter = newRecordSetWriter();
        if (i2 > 0) {
            this.scheduler.scheduleAtFixedRate(this, i2, i2, TimeUnit.MILLISECONDS);
        }
    }

    String getStream(int i) {
        return this.streams.get(i);
    }

    LogRecordSet.Writer getLogRecordSetWriter() {
        return this.recordSetWriter;
    }

    private LogRecordSet.Writer newRecordSetWriter() {
        return LogRecordSet.newWriter(this.bufferSize, this.codec);
    }

    public synchronized Future<DLSN> write(ByteBuffer byteBuffer) {
        int remaining = byteBuffer.remaining();
        if (remaining > 1040384) {
            return Future.exception(new LogRecordTooLongException("Log record of size " + remaining + " written when only 1040384 is allowed"));
        }
        if (this.recordSetWriter.getNumBytes() + remaining > 1044480) {
            flush();
        }
        Promise promise = new Promise();
        try {
            this.recordSetWriter.writeRecord(byteBuffer, promise);
            if (this.recordSetWriter.getNumBytes() >= this.bufferSize) {
                flush();
            }
            return promise;
        } catch (WriteException e) {
            this.recordSetWriter.abortTransmit(e);
            this.recordSetWriter = newRecordSetWriter();
            return Future.exception(e);
        } catch (LogRecordTooLongException e2) {
            return Future.exception(e2);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        flush();
    }

    private void flush() {
        synchronized (this) {
            if (this.recordSetWriter.getNumRecords() == 0) {
                return;
            }
            LogRecordSet.Writer writer = this.recordSetWriter;
            this.recordSetWriter = newRecordSetWriter();
            transmit(writer);
        }
    }

    private void transmit(LogRecordSet.Writer writer) {
        this.speculativePolicy.initiateSpeculativeRequest(this.scheduler, new PendingWriteRequest(writer));
    }

    public void close() {
        if (this.ownScheduler) {
            this.scheduler.shutdown();
        }
    }
}
