package com.twitter.distributedlog.benchmark;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.twitter.common.zookeeper.ServerSet;
import com.twitter.distributedlog.AsyncLogReader;
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.DistributedLogManager;
import com.twitter.distributedlog.LogRecordWithDLSN;
import com.twitter.distributedlog.benchmark.thrift.Message;
import com.twitter.distributedlog.client.serverset.DLZkServerSet;
import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.namespace.DistributedLogNamespace;
import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
import com.twitter.distributedlog.service.DistributedLogClient;
import com.twitter.distributedlog.service.DistributedLogClientBuilder;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.SchedulerUtils;
import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.stats.StatsReceiver;
import com.twitter.finagle.thrift.ClientId$;
import com.twitter.util.Duration$;
import com.twitter.util.Function;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/twitter/distributedlog/benchmark/ReaderWorker.class */
public class ReaderWorker implements Worker {
    static final Logger LOG = LoggerFactory.getLogger(ReaderWorker.class);
    static final int BACKOFF_MS = 200;
    final String streamPrefix;
    final int startStreamId;
    final int endStreamId;
    final ScheduledExecutorService executorService;
    final DistributedLogNamespace namespace;
    final DistributedLogManager[] dlms;
    final AsyncLogReader[] logReaders;
    final StreamReader[] streamReaders;
    final int numStreams;
    final boolean readFromHead;
    final int truncationIntervalInSeconds;
    final DLZkServerSet[] serverSets;
    final List<String> finagleNames;
    final DistributedLogClient dlc;
    volatile boolean running = true;
    final StatsReceiver statsReceiver;
    final StatsLogger statsLogger;
    final OpStatsLogger e2eStat;
    final OpStatsLogger deliveryStat;
    final OpStatsLogger negativeE2EStat;
    final OpStatsLogger negativeDeliveryStat;
    final OpStatsLogger truncationStat;
    final Counter invalidRecordsCounter;
    final Counter outOfOrderSequenceIdCounter;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/twitter/distributedlog/benchmark/ReaderWorker$StreamReader.class */
    public class StreamReader implements FutureEventListener<LogRecordWithDLSN>, Runnable, Gauge<Number> {
        final int streamIdx;
        final String streamName;
        DLSN prevDLSN = null;
        long prevSequenceId = Long.MIN_VALUE;

        StreamReader(int i, StatsLogger statsLogger) {
            this.streamIdx = i;
            this.streamName = String.format("%s_%d", ReaderWorker.this.streamPrefix, Integer.valueOf(ReaderWorker.this.startStreamId + this.streamIdx));
            statsLogger.scope(this.streamName).registerGauge("sequence_id", this);
        }

        public void onSuccess(LogRecordWithDLSN logRecordWithDLSN) {
            try {
                Message parseMessage = Utils.parseMessage(logRecordWithDLSN.getPayload());
                long currentTimeMillis = System.currentTimeMillis();
                long publishTime = currentTimeMillis - parseMessage.getPublishTime();
                long transactionId = currentTimeMillis - logRecordWithDLSN.getTransactionId();
                if (publishTime >= 0) {
                    ReaderWorker.this.e2eStat.registerSuccessfulEvent(publishTime);
                } else {
                    ReaderWorker.this.negativeE2EStat.registerSuccessfulEvent(-publishTime);
                }
                if (transactionId >= 0) {
                    ReaderWorker.this.deliveryStat.registerSuccessfulEvent(transactionId);
                } else {
                    ReaderWorker.this.negativeDeliveryStat.registerSuccessfulEvent(-transactionId);
                }
                synchronized (this) {
                    if (logRecordWithDLSN.getSequenceId() <= this.prevSequenceId || (this.prevSequenceId >= 0 && logRecordWithDLSN.getSequenceId() != this.prevSequenceId + 1)) {
                        ReaderWorker.this.outOfOrderSequenceIdCounter.inc();
                        ReaderWorker.LOG.warn("Encountered decreasing sequence id for stream {} : previous = {}, current = {}", new Object[]{Integer.valueOf(this.streamIdx), Long.valueOf(this.prevSequenceId), Long.valueOf(logRecordWithDLSN.getSequenceId())});
                    }
                    this.prevSequenceId = logRecordWithDLSN.getSequenceId();
                }
                this.prevDLSN = logRecordWithDLSN.getDlsn();
                readLoop();
            } catch (TException e) {
                ReaderWorker.this.invalidRecordsCounter.inc();
                ReaderWorker.LOG.warn("Failed to parse record {} for stream {} : size = {} , ", new Object[]{logRecordWithDLSN, Integer.valueOf(this.streamIdx), Integer.valueOf(logRecordWithDLSN.getPayload().length), e});
            }
        }

        public void onFailure(Throwable th) {
            ReaderWorker.this.scheduleReinitStream(this.streamIdx).map(new Function<Void, Void>() { // from class: com.twitter.distributedlog.benchmark.ReaderWorker.StreamReader.1
                public Void apply(Void r5) {
                    StreamReader.this.prevDLSN = null;
                    StreamReader.this.prevSequenceId = Long.MIN_VALUE;
                    StreamReader.this.readLoop();
                    return null;
                }
            });
        }

        void readLoop() {
            if (ReaderWorker.this.running) {
                ReaderWorker.this.logReaders[this.streamIdx].readNext().addEventListener(this);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            final DLSN dlsn = this.prevDLSN;
            if (null == dlsn) {
                return;
            }
            final Stopwatch createStarted = Stopwatch.createStarted();
            ReaderWorker.this.dlc.truncate(this.streamName, dlsn).addEventListener(new FutureEventListener<Boolean>() { // from class: com.twitter.distributedlog.benchmark.ReaderWorker.StreamReader.2
                public void onSuccess(Boolean bool) {
                    ReaderWorker.this.truncationStat.registerSuccessfulEvent(createStarted.stop().elapsed(TimeUnit.MILLISECONDS));
                }

                public void onFailure(Throwable th) {
                    ReaderWorker.this.truncationStat.registerFailedEvent(createStarted.stop().elapsed(TimeUnit.MILLISECONDS));
                    ReaderWorker.LOG.error("Failed to truncate stream {} to {} : ", new Object[]{StreamReader.this.streamName, dlsn, th});
                }
            });
        }

        public Number getDefaultValue() {
            return Long.MIN_VALUE;
        }

        public synchronized Number getSample() {
            return Long.valueOf(this.prevSequenceId);
        }
    }

    public ReaderWorker(DistributedLogConfiguration distributedLogConfiguration, URI uri, String str, int i, int i2, int i3, List<String> list, List<String> list2, int i4, boolean z, StatsReceiver statsReceiver, StatsLogger statsLogger) throws IOException {
        DistributedLogClientBuilder serverSets;
        Preconditions.checkArgument(i <= i2);
        this.streamPrefix = str;
        this.startStreamId = i;
        this.endStreamId = i2;
        this.truncationIntervalInSeconds = i4;
        this.readFromHead = z;
        this.statsReceiver = statsReceiver;
        this.statsLogger = statsLogger;
        this.e2eStat = this.statsLogger.getOpStatsLogger("e2e");
        this.negativeE2EStat = this.statsLogger.getOpStatsLogger("e2eNegative");
        this.deliveryStat = this.statsLogger.getOpStatsLogger("delivery");
        this.negativeDeliveryStat = this.statsLogger.getOpStatsLogger("deliveryNegative");
        this.truncationStat = this.statsLogger.getOpStatsLogger("truncation");
        this.invalidRecordsCounter = this.statsLogger.getCounter("invalid_records");
        this.outOfOrderSequenceIdCounter = this.statsLogger.getCounter("out_of_order_seq_id");
        this.executorService = Executors.newScheduledThreadPool(i3, new ThreadFactoryBuilder().setNameFormat("benchmark.reader-%d").build());
        this.finagleNames = list2;
        this.serverSets = createServerSets(list);
        if (i4 <= 0 || (list2.isEmpty() && list.isEmpty())) {
            this.dlc = null;
        } else {
            DistributedLogClientBuilder name = DistributedLogClientBuilder.newBuilder().clientId(ClientId$.MODULE$.apply("dlog_loadtest_reader")).clientBuilder(ClientBuilder.get().hostConnectionLimit(10).hostConnectionCoresize(10).tcpConnectTimeout(Duration$.MODULE$.fromSeconds(1)).requestTimeout(Duration$.MODULE$.fromSeconds(2))).redirectBackoffStartMs(100).redirectBackoffMaxMs(500).requestTimeoutMs(2000).statsReceiver(statsReceiver).name("reader");
            if (list.isEmpty()) {
                String str2 = list2.get(0);
                String[] strArr = new String[list2.size() - 1];
                list2.subList(1, list2.size()).toArray(strArr);
                serverSets = name.finagleNameStrs(str2, strArr);
                LOG.info("Initialized distributedlog client for truncation @ {}.", list2);
            } else {
                ServerSet serverSet = this.serverSets[0].getServerSet();
                ServerSet[] serverSetArr = new ServerSet[this.serverSets.length - 1];
                for (int i5 = 1; i5 < this.serverSets.length; i5++) {
                    serverSetArr[i5 - 1] = this.serverSets[i5].getServerSet();
                }
                serverSets = name.serverSets(serverSet, serverSetArr);
                LOG.info("Initialized distributedlog client for truncation @ {}.", list);
            }
            this.dlc = serverSets.build();
        }
        this.namespace = DistributedLogNamespaceBuilder.newBuilder().conf(distributedLogConfiguration).uri(uri).statsLogger(statsLogger.scope("dl")).build();
        this.numStreams = i2 - i;
        this.dlms = new DistributedLogManager[this.numStreams];
        this.logReaders = new AsyncLogReader[this.numStreams];
        final CountDownLatch countDownLatch = new CountDownLatch(this.numStreams);
        for (int i6 = 0; i6 < this.numStreams; i6++) {
            final int i7 = i6;
            this.executorService.submit(new Runnable() { // from class: com.twitter.distributedlog.benchmark.ReaderWorker.1
                @Override // java.lang.Runnable
                public void run() {
                    ReaderWorker.this.reinitStream(i7).map(new Function<Void, Void>() { // from class: com.twitter.distributedlog.benchmark.ReaderWorker.1.1
                        public Void apply(Void r5) {
                            ReaderWorker.LOG.info("Initialized stream reader {}.", Integer.valueOf(i7));
                            countDownLatch.countDown();
                            return null;
                        }
                    });
                }
            });
        }
        try {
            countDownLatch.await();
            this.streamReaders = new StreamReader[this.numStreams];
            for (int i8 = 0; i8 < this.numStreams; i8++) {
                this.streamReaders[i8] = new StreamReader(i8, statsLogger.scope("perstream"));
                if (i4 > 0) {
                    this.executorService.scheduleWithFixedDelay(this.streamReaders[i8], i4, i4, TimeUnit.SECONDS);
                }
            }
            LOG.info("Initialized benchmark reader on {} streams {} : [{} - {})", new Object[]{Integer.valueOf(this.numStreams), str, Integer.valueOf(i), Integer.valueOf(i2)});
        } catch (InterruptedException e) {
            throw new DLInterruptedException("Failed to intialize benchmark readers : ", e);
        }
    }

    protected DLZkServerSet[] createServerSets(List<String> list) {
        DLZkServerSet[] dLZkServerSetArr = new DLZkServerSet[list.size()];
        for (int i = 0; i < dLZkServerSetArr.length; i++) {
            dLZkServerSetArr[i] = DLZkServerSet.of(URI.create(list.get(i)), 60000);
        }
        return dLZkServerSetArr;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Future<Void> reinitStream(int i) {
        Promise<Void> promise = new Promise<>();
        reinitStream(i, promise);
        return promise;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reinitStream(int i, Promise<Void> promise) {
        DLSN lastDLSN;
        String format = String.format("%s_%d", this.streamPrefix, Integer.valueOf(this.startStreamId + i));
        if (this.logReaders[i] != null) {
            try {
                FutureUtils.result(this.logReaders[i].asyncClose());
            } catch (IOException e) {
                LOG.warn("Failed on closing stream reader {} : ", format, e);
            }
            this.logReaders[i] = null;
        }
        if (this.dlms[i] != null) {
            try {
                this.dlms[i].close();
            } catch (IOException e2) {
                LOG.warn("Failed on closing dlm {} : ", format, e2);
            }
            this.dlms[i] = null;
        }
        try {
            this.dlms[i] = this.namespace.openLog(format);
            if (this.readFromHead) {
                lastDLSN = DLSN.InitialDLSN;
            } else {
                try {
                    lastDLSN = this.dlms[i].getLastDLSN();
                } catch (IOException e3) {
                    LOG.error("Failed on getting last dlsn from stream {} : ", format, e3);
                    scheduleReinitStream(i, promise);
                    return;
                }
            }
            try {
                this.logReaders[i] = this.dlms[i].getAsyncLogReader(lastDLSN);
                LOG.info("Opened reader for stream {}, starting from {}.", format, lastDLSN);
                promise.setValue((Object) null);
            } catch (IOException e4) {
                LOG.error("Failed on opening reader for stream {} starting from {} : ", new Object[]{format, lastDLSN, e4});
                scheduleReinitStream(i, promise);
            }
        } catch (IOException e5) {
            LOG.error("Failed on creating dlm {} : ", format, e5);
            scheduleReinitStream(i, promise);
        }
    }

    Future<Void> scheduleReinitStream(int i) {
        Promise<Void> promise = new Promise<>();
        scheduleReinitStream(i, promise);
        return promise;
    }

    void scheduleReinitStream(final int i, final Promise<Void> promise) {
        this.executorService.schedule(new Runnable() { // from class: com.twitter.distributedlog.benchmark.ReaderWorker.2
            @Override // java.lang.Runnable
            public void run() {
                ReaderWorker.this.reinitStream(i, promise);
            }
        }, 200L, TimeUnit.MILLISECONDS);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.running = false;
        for (AsyncLogReader asyncLogReader : this.logReaders) {
            if (null != asyncLogReader) {
                FutureUtils.result(asyncLogReader.asyncClose());
            }
        }
        for (DistributedLogManager distributedLogManager : this.dlms) {
            if (null != distributedLogManager) {
                distributedLogManager.close();
            }
        }
        this.namespace.close();
        SchedulerUtils.shutdownScheduler(this.executorService, 2L, TimeUnit.MINUTES);
        if (this.dlc != null) {
            this.dlc.close();
        }
        for (DLZkServerSet dLZkServerSet : this.serverSets) {
            dLZkServerSet.close();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info("Starting reader (prefix = {}, numStreams = {}).", this.streamPrefix, Integer.valueOf(this.numStreams));
        for (StreamReader streamReader : this.streamReaders) {
            streamReader.readLoop();
        }
    }
}
