package com.twitter.distributedlog.benchmark;

import com.google.common.base.Preconditions;
import com.twitter.common.zookeeper.ServerSet;
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.benchmark.utils.ShiftableRateLimiter;
import com.twitter.distributedlog.client.DistributedLogMultiStreamWriter;
import com.twitter.distributedlog.client.serverset.DLZkServerSet;
import com.twitter.distributedlog.exceptions.DLException;
import com.twitter.distributedlog.io.CompressionCodec;
import com.twitter.distributedlog.service.DistributedLogClient;
import com.twitter.distributedlog.service.DistributedLogClientBuilder;
import com.twitter.distributedlog.util.SchedulerUtils;
import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.stats.StatsReceiver;
import com.twitter.finagle.thrift.ClientId$;
import com.twitter.util.Duration$;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/twitter/distributedlog/benchmark/WriterWorker.class */
public class WriterWorker implements Worker {
    static final Logger LOG = LoggerFactory.getLogger(WriterWorker.class);
    final String streamPrefix;
    final int startStreamId;
    final int endStreamId;
    final int writeConcurrency;
    final int messageSizeBytes;
    final int hostConnectionCoreSize;
    final int hostConnectionLimit;
    final ExecutorService executorService;
    final ShiftableRateLimiter rateLimiter;
    final DLZkServerSet[] serverSets;
    final List<String> finagleNames;
    final Random random;
    final List<String> streamNames;
    final int numStreams;
    final int batchSize;
    final boolean thriftmux;
    final boolean handshakeWithClientInfo;
    final int sendBufferSize;
    final int recvBufferSize;
    final boolean enableBatching;
    volatile boolean running = true;
    final StatsReceiver statsReceiver;
    final StatsLogger statsLogger;
    final OpStatsLogger requestStat;
    final StatsLogger exceptionsLogger;
    final StatsLogger dlErrorCodeLogger;

    /* loaded from: input_file:com/twitter/distributedlog/benchmark/WriterWorker$BulkWriter.class */
    class BulkWriter implements Runnable {
        final int idx;
        final DistributedLogClient dlc;

        BulkWriter(int i) {
            this.idx = i;
            this.dlc = WriterWorker.this.buildDlogClient();
        }

        @Override // java.lang.Runnable
        public void run() {
            WriterWorker.LOG.info("Started writer {}.", Integer.valueOf(this.idx));
            while (WriterWorker.this.running) {
                WriterWorker.this.rateLimiter.getLimiter().acquire(WriterWorker.this.batchSize);
                String str = WriterWorker.this.streamNames.get(WriterWorker.this.random.nextInt(WriterWorker.this.numStreams));
                long currentTimeMillis = System.currentTimeMillis();
                List<ByteBuffer> buildBufferList = WriterWorker.this.buildBufferList(WriterWorker.this.batchSize, currentTimeMillis, WriterWorker.this.messageSizeBytes);
                if (null == buildBufferList) {
                    break;
                }
                Iterator it = this.dlc.writeBulk(str, buildBufferList).iterator();
                while (it.hasNext()) {
                    ((Future) it.next()).addEventListener(new TimedRequestHandler(str, currentTimeMillis));
                }
            }
            this.dlc.close();
        }
    }

    /* loaded from: input_file:com/twitter/distributedlog/benchmark/WriterWorker$TimedRequestHandler.class */
    class TimedRequestHandler implements FutureEventListener<DLSN> {
        final String streamName;
        final long requestMillis;

        TimedRequestHandler(String str, long j) {
            this.streamName = str;
            this.requestMillis = j;
        }

        public void onSuccess(DLSN dlsn) {
            WriterWorker.this.requestStat.registerSuccessfulEvent(System.currentTimeMillis() - this.requestMillis);
        }

        public void onFailure(Throwable th) {
            WriterWorker.LOG.error("Failed to publish to {} : ", this.streamName, th);
            WriterWorker.this.requestStat.registerFailedEvent(System.currentTimeMillis() - this.requestMillis);
            WriterWorker.this.exceptionsLogger.getCounter(th.getClass().getName()).inc();
            if (th instanceof DLException) {
                WriterWorker.this.dlErrorCodeLogger.getCounter(((DLException) th).getCode().toString()).inc();
            }
        }
    }

    /* loaded from: input_file:com/twitter/distributedlog/benchmark/WriterWorker$Writer.class */
    class Writer implements Runnable {
        final int idx;
        final DistributedLogClient dlc;
        DistributedLogMultiStreamWriter writer;

        Writer(int i) {
            this.writer = null;
            this.idx = i;
            this.dlc = WriterWorker.this.buildDlogClient();
            if (WriterWorker.this.enableBatching) {
                this.writer = DistributedLogMultiStreamWriter.newBuilder().client(this.dlc).streams(WriterWorker.this.streamNames).compressionCodec(CompressionCodec.Type.NONE).flushIntervalMs(20).bufferSize(65536).firstSpeculativeTimeoutMs(50).maxSpeculativeTimeoutMs(200).speculativeBackoffMultiplier(2.0f).build();
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            WriterWorker.LOG.info("Started writer {}.", Integer.valueOf(this.idx));
            while (WriterWorker.this.running) {
                WriterWorker.this.rateLimiter.getLimiter().acquire();
                String str = WriterWorker.this.streamNames.get(WriterWorker.this.random.nextInt(WriterWorker.this.numStreams));
                long currentTimeMillis = System.currentTimeMillis();
                ByteBuffer buildBuffer = WriterWorker.this.buildBuffer(currentTimeMillis, WriterWorker.this.messageSizeBytes);
                if (null == buildBuffer) {
                    break;
                } else if (null != this.writer) {
                    this.writer.write(buildBuffer).addEventListener(new TimedRequestHandler(str, currentTimeMillis));
                } else {
                    this.dlc.write(str, buildBuffer).addEventListener(new TimedRequestHandler(str, currentTimeMillis));
                }
            }
            if (null != this.writer) {
                this.writer.close();
            }
            this.dlc.close();
        }
    }

    public WriterWorker(String str, int i, int i2, ShiftableRateLimiter shiftableRateLimiter, int i3, int i4, int i5, int i6, int i7, List<String> list, List<String> list2, StatsReceiver statsReceiver, StatsLogger statsLogger, boolean z, boolean z2, int i8, int i9, boolean z3) {
        Preconditions.checkArgument(i <= i2);
        Preconditions.checkArgument((list2.isEmpty() && list.isEmpty()) ? false : true);
        this.streamPrefix = str;
        this.startStreamId = i;
        this.endStreamId = i2;
        this.rateLimiter = shiftableRateLimiter;
        this.writeConcurrency = i3;
        this.messageSizeBytes = i4;
        this.statsReceiver = statsReceiver;
        this.statsLogger = statsLogger;
        this.requestStat = this.statsLogger.getOpStatsLogger("requests");
        this.exceptionsLogger = statsLogger.scope("exceptions");
        this.dlErrorCodeLogger = statsLogger.scope("dl_error_code");
        this.executorService = Executors.newCachedThreadPool();
        this.random = new Random(System.currentTimeMillis());
        this.batchSize = i5;
        this.hostConnectionCoreSize = i6;
        this.hostConnectionLimit = i7;
        this.thriftmux = z;
        this.handshakeWithClientInfo = z2;
        this.sendBufferSize = i8;
        this.recvBufferSize = i9;
        this.enableBatching = z3;
        this.finagleNames = list2;
        this.serverSets = createServerSets(list);
        this.streamNames = new ArrayList(i2 - i);
        for (int i10 = i; i10 < i2; i10++) {
            this.streamNames.add(String.format("%s_%d", str, Integer.valueOf(i10)));
        }
        this.numStreams = this.streamNames.size();
        LOG.info("Writing to {} streams : {}", Integer.valueOf(this.numStreams), this.streamNames);
    }

    protected DLZkServerSet[] createServerSets(List<String> list) {
        DLZkServerSet[] dLZkServerSetArr = new DLZkServerSet[list.size()];
        for (int i = 0; i < dLZkServerSetArr.length; i++) {
            dLZkServerSetArr[i] = DLZkServerSet.of(URI.create(list.get(i)), 60000);
        }
        return dLZkServerSetArr;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.running = false;
        SchedulerUtils.shutdownScheduler(this.executorService, 2L, TimeUnit.MINUTES);
        for (DLZkServerSet dLZkServerSet : this.serverSets) {
            dLZkServerSet.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DistributedLogClient buildDlogClient() {
        DistributedLogClientBuilder serverSets;
        DistributedLogClientBuilder name = DistributedLogClientBuilder.newBuilder().clientId(ClientId$.MODULE$.apply("dlog_loadtest_writer")).clientBuilder(ClientBuilder.get().hostConnectionLimit(this.hostConnectionLimit).hostConnectionCoresize(this.hostConnectionCoreSize).tcpConnectTimeout(Duration$.MODULE$.fromMilliseconds(200L)).connectTimeout(Duration$.MODULE$.fromMilliseconds(200L)).requestTimeout(Duration$.MODULE$.fromSeconds(2)).sendBufferSize(this.sendBufferSize).recvBufferSize(this.recvBufferSize)).thriftmux(this.thriftmux).redirectBackoffStartMs(100).redirectBackoffMaxMs(500).requestTimeoutMs(2000).statsReceiver(this.statsReceiver).streamNameRegex("^" + this.streamPrefix + "_[0-9]+$").handshakeWithClientInfo(this.handshakeWithClientInfo).periodicHandshakeIntervalMs(TimeUnit.SECONDS.toMillis(30L)).periodicOwnershipSyncIntervalMs(TimeUnit.MINUTES.toMillis(5L)).periodicDumpOwnershipCache(true).handshakeTracing(true).name("writer");
        if (this.serverSets.length == 0) {
            String str = this.finagleNames.get(0);
            String[] strArr = new String[this.finagleNames.size() - 1];
            this.finagleNames.subList(1, this.finagleNames.size()).toArray(strArr);
            serverSets = name.finagleNameStrs(str, strArr);
        } else {
            ServerSet serverSet = this.serverSets[0].getServerSet();
            ServerSet[] serverSetArr = new ServerSet[this.serverSets.length - 1];
            for (int i = 1; i < this.serverSets.length; i++) {
                serverSetArr[i - 1] = this.serverSets[i].getServerSet();
            }
            serverSets = name.serverSets(serverSet, serverSetArr);
        }
        return serverSets.build();
    }

    ByteBuffer buildBuffer(long j, int i) {
        try {
            return ByteBuffer.wrap(Utils.generateMessage(j, i));
        } catch (TException e) {
            LOG.error("Error generating message : ", e);
            return null;
        }
    }

    List<ByteBuffer> buildBufferList(int i, long j, int i2) {
        ArrayList arrayList = new ArrayList(i);
        for (int i3 = 0; i3 < i; i3++) {
            ByteBuffer buildBuffer = buildBuffer(j, i2);
            if (null == buildBuffer) {
                return null;
            }
            arrayList.add(buildBuffer);
        }
        return arrayList;
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info("Starting writer (concurrency = {}, prefix = {}, batchSize = {})", new Object[]{Integer.valueOf(this.writeConcurrency), this.streamPrefix, Integer.valueOf(this.batchSize)});
        for (int i = 0; i < this.writeConcurrency; i++) {
            this.executorService.submit(this.batchSize > 0 ? new BulkWriter(i) : new Writer(i));
        }
    }
}
