/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hedwig.client.benchmark;

import com.google.protobuf.ByteString;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.api.Publisher;
import org.apache.hedwig.client.api.Subscriber;
import org.apache.hedwig.client.benchmark.BenchmarkUtils;
import org.apache.hedwig.client.benchmark.BenchmarkWorker;
import org.apache.hedwig.client.benchmark.HedwigBenchmark;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.util.Callback;

public class BenchmarkPublisher
extends BenchmarkWorker {
    Publisher publisher;
    Subscriber subscriber;
    int msgSize;
    int nParallel;
    double rate;

    public BenchmarkPublisher(int numTopics, int numMessages, int numRegions, int startTopicLabel, int partitionIndex, int numPartitions, Publisher publisher, Subscriber subscriber, int msgSize, int nParallel, int rate) {
        super(numTopics, numMessages, numRegions, startTopicLabel, partitionIndex, numPartitions);
        this.publisher = publisher;
        this.msgSize = msgSize;
        this.subscriber = subscriber;
        this.nParallel = nParallel;
        this.rate = (double)rate / ((double)(numRegions * numPartitions) + 0.0);
    }

    public void warmup(int nWarmup) throws Exception {
        ByteString topic = ByteString.copyFromUtf8((String)("warmup" + this.partitionIndex));
        ByteString subId = ByteString.copyFromUtf8((String)"sub");
        this.subscriber.subscribe(topic, subId, PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH);
        this.subscriber.startDelivery(topic, subId, new MessageHandler(){

            @Override
            public void deliver(ByteString topic, ByteString subscriberId, PubSubProtocol.Message msg, Callback<Void> callback, Object context) {
                callback.operationFinished(context, null);
            }
        });
        BenchmarkUtils.ThroughputLatencyAggregator agg = new BenchmarkUtils.ThroughputLatencyAggregator("acked pubs", nWarmup, 100);
        agg.startProgress();
        PubSubProtocol.Message msg = this.getMsg(1024);
        for (int i = 0; i < nWarmup; ++i) {
            this.publisher.asyncPublish(topic, msg, new BenchmarkUtils.BenchmarkCallback(agg), null);
        }
        if (agg.tpAgg.queue.take() > 0) {
            throw new RuntimeException("Warmup publishes failed!");
        }
    }

    public PubSubProtocol.Message getMsg(int size) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < size; ++i) {
            sb.append('a');
        }
        ByteString body = ByteString.copyFromUtf8((String)sb.toString());
        PubSubProtocol.Message msg = PubSubProtocol.Message.newBuilder().setBody(body).build();
        return msg;
    }

    public Void call() throws Exception {
        PubSubProtocol.Message msg = this.getMsg(this.msgSize);
        int myPublishCount = 0;
        for (int i = 0; i < this.numTopics; ++i) {
            if (!HedwigBenchmark.amIResponsibleForTopic(this.startTopicLabel + i, this.partitionIndex, this.numPartitions)) continue;
            ByteString topic = ByteString.copyFromUtf8((String)("topic" + (this.startTopicLabel + i)));
            this.publisher.publish(topic, msg);
            ++myPublishCount;
        }
        long startTime = System.currentTimeMillis();
        int myPublishLimit = this.numMessages / this.numRegions / this.numPartitions - myPublishCount;
        myPublishCount = 0;
        BenchmarkUtils.ThroughputLatencyAggregator agg = new BenchmarkUtils.ThroughputLatencyAggregator("acked pubs", myPublishLimit, this.nParallel);
        agg.startProgress();
        int topicLabel = 0;
        while (myPublishCount < myPublishLimit) {
            long delay;
            int topicNum = this.startTopicLabel + topicLabel;
            topicLabel = (topicLabel + 1) % this.numTopics;
            if (!HedwigBenchmark.amIResponsibleForTopic(topicNum, this.partitionIndex, this.numPartitions)) continue;
            ByteString topic = ByteString.copyFromUtf8((String)("topic" + topicNum));
            if (this.rate > 0.0 && (delay = startTime + (long)((double)(1000 * myPublishCount) / this.rate) - System.currentTimeMillis()) > 0L) {
                Thread.sleep(delay);
            }
            this.publisher.asyncPublish(topic, msg, new BenchmarkUtils.BenchmarkCallback(agg), null);
            ++myPublishCount;
        }
        System.out.println("Finished unacked pubs: tput = " + BenchmarkUtils.calcTp(myPublishLimit, startTime) + " ops/s");
        agg.tpAgg.queue.take();
        System.out.println(agg.summarize(startTime));
        return null;
    }
}

