package gobblin.util.limiter.stressTest;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.ConfigFactory;
import gobblin.broker.BrokerConfigurationKeyGenerator;
import gobblin.broker.SharedResourcesBrokerFactory;
import gobblin.broker.SimpleScopeType;
import gobblin.broker.iface.NotConfiguredException;
import gobblin.broker.iface.ScopeType;
import gobblin.broker.iface.SharedResourcesBroker;
import gobblin.restli.SharedRestClientFactory;
import gobblin.restli.SharedRestClientKey;
import gobblin.util.ExecutorsUtils;
import gobblin.util.limiter.Limiter;
import gobblin.util.limiter.MultiLimiter;
import gobblin.util.limiter.NoopLimiter;
import gobblin.util.limiter.RateBasedLimiter;
import gobblin.util.limiter.RestliLimiterFactory;
import gobblin.util.limiter.broker.SharedLimiterKey;
import java.beans.ConstructorProperties;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/util/limiter/stressTest/MRStressTest.class */
public class MRStressTest {
    public static final String USE_THROTTLING_SERVER = "stressTest.useThrottlingServer";
    public static final String RESOURCE_ID = "stressTest.resourceLimited";
    public static final String LOCALLY_ENFORCED_QPS = "stressTest.localQps";
    public static final String NUM_MAPPERS = "stressTest.num.mappers";
    public static final String DEFAULT_MAPPERS = "10";
    private static final Logger log = LoggerFactory.getLogger(MRStressTest.class);
    public static final Option NUM_MAPPERS_OPT = new Option("mappers", true, "Num mappers");
    public static final Option THROTTLING_SERVER_URI = new Option(RestliLimiterFactory.RESTLI_SERVICE_NAME, true, "Throttling server uri");
    public static final Option RESOURCE_ID_OPT = new Option("resource", true, "Resource id for throttling server");
    public static final Option LOCAL_QPS_OPT = new Option("localQps", true, "Locally enforced QPS");
    public static final Options OPTIONS = StressTestUtils.OPTIONS.addOption(NUM_MAPPERS_OPT).addOption(THROTTLING_SERVER_URI).addOption(RESOURCE_ID_OPT).addOption(LOCAL_QPS_OPT);

    /* loaded from: input_file:gobblin/util/limiter/stressTest/MRStressTest$AggregatorReducer.class */
    public static class AggregatorReducer extends Reducer<LongWritable, DoubleWritable, LongWritable, Text> {
        protected void reduce(LongWritable longWritable, Iterable<DoubleWritable> iterable, Reducer<LongWritable, DoubleWritable, LongWritable, Text>.Context context) throws IOException, InterruptedException {
            double d = 0.0d;
            int i = 0;
            Iterator<DoubleWritable> it = iterable.iterator();
            while (it.hasNext()) {
                d += it.next().get();
                i++;
            }
            context.write(longWritable, new Text(String.format("%f\t%d", Double.valueOf(d), Integer.valueOf(i))));
        }

        protected /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
            reduce((LongWritable) obj, (Iterable<DoubleWritable>) iterable, (Reducer<LongWritable, DoubleWritable, LongWritable, Text>.Context) context);
        }
    }

    /* loaded from: input_file:gobblin/util/limiter/stressTest/MRStressTest$MyInputFormat.class */
    public static class MyInputFormat extends InputFormat<Text, NullWritable> {
        public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
            int i = jobContext.getConfiguration().getInt(MRStressTest.NUM_MAPPERS, 1);
            ArrayList newArrayList = Lists.newArrayList();
            for (int i2 = 0; i2 < i; i2++) {
                newArrayList.add(new MySplit());
            }
            return newArrayList;
        }

        public RecordReader<Text, NullWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            return new MyRecordReader((MySplit) inputSplit);
        }
    }

    /* loaded from: input_file:gobblin/util/limiter/stressTest/MRStressTest$MyRecordReader.class */
    public static class MyRecordReader extends RecordReader<Text, NullWritable> {
        private final MySplit split;
        boolean keyValueAvailable = true;

        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        }

        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (!this.keyValueAvailable) {
                return false;
            }
            this.keyValueAvailable = false;
            return true;
        }

        /* renamed from: getCurrentKey, reason: merged with bridge method [inline-methods] */
        public Text m6getCurrentKey() throws IOException, InterruptedException {
            return new Text("split");
        }

        /* renamed from: getCurrentValue, reason: merged with bridge method [inline-methods] */
        public NullWritable m5getCurrentValue() throws IOException, InterruptedException {
            return NullWritable.get();
        }

        public float getProgress() throws IOException, InterruptedException {
            return 0.0f;
        }

        public void close() throws IOException {
        }

        @ConstructorProperties({"split"})
        public MyRecordReader(MySplit mySplit) {
            this.split = mySplit;
        }
    }

    /* loaded from: input_file:gobblin/util/limiter/stressTest/MRStressTest$MySplit.class */
    public static class MySplit extends InputSplit implements Writable {
        public long getLength() throws IOException, InterruptedException {
            return 1L;
        }

        public String[] getLocations() throws IOException, InterruptedException {
            return new String[0];
        }

        public void write(DataOutput dataOutput) throws IOException {
            Text.writeString(dataOutput, "split");
        }

        public void readFields(DataInput dataInput) throws IOException {
            Text.readString(dataInput);
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            return (obj instanceof MySplit) && ((MySplit) obj).canEqual(this);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof MySplit;
        }

        public int hashCode() {
            return 1;
        }

        public String toString() {
            return "MRStressTest.MySplit()";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:gobblin/util/limiter/stressTest/MRStressTest$Recorder.class */
    public static class Recorder implements Runnable {
        private final RateComputingLimiterContainer limiter;
        private final MapContext<Text, NullWritable, LongWritable, DoubleWritable> context;
        private final boolean relativeKey;
        private int runs = -1;

        @Override // java.lang.Runnable
        public void run() {
            long millis;
            DescriptiveStatistics rateStatsSinceLastReport = this.limiter.getRateStatsSinceLastReport();
            long currentTimeMillis = System.currentTimeMillis();
            this.runs++;
            if (rateStatsSinceLastReport != null) {
                if (this.relativeKey) {
                    millis = 15 * this.runs;
                } else {
                    DateTime withMillisOfSecond = new DateTime(currentTimeMillis).withMillisOfSecond(0);
                    millis = withMillisOfSecond.withSecondOfMinute(15 * (withMillisOfSecond.getSecondOfMinute() / 15)).getMillis() / 1000;
                }
                try {
                    this.context.write(new LongWritable(millis), new DoubleWritable(rateStatsSinceLastReport.getSum()));
                } catch (IOException | InterruptedException e) {
                    MRStressTest.log.error("Error: ", e);
                }
            }
        }

        @ConstructorProperties({"limiter", "context", "relativeKey"})
        public Recorder(RateComputingLimiterContainer rateComputingLimiterContainer, MapContext<Text, NullWritable, LongWritable, DoubleWritable> mapContext, boolean z) {
            this.limiter = rateComputingLimiterContainer;
            this.context = mapContext;
            this.relativeKey = z;
        }
    }

    /* loaded from: input_file:gobblin/util/limiter/stressTest/MRStressTest$StresserMapper.class */
    public static class StresserMapper extends Mapper<Text, NullWritable, LongWritable, DoubleWritable> {
        private SharedResourcesBroker<SimpleScopeType> broker;

        protected void setup(Mapper<Text, NullWritable, LongWritable, DoubleWritable>.Context context) throws IOException, InterruptedException {
            HashMap newHashMap = Maps.newHashMap();
            SharedResourcesBrokerFactory.addBrokerKeys(newHashMap, context.getConfiguration());
            this.broker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker(ConfigFactory.parseMap(newHashMap), SimpleScopeType.GLOBAL.defaultScopeInstance());
            super.setup(context);
        }

        protected void map(Text text, NullWritable nullWritable, Mapper<Text, NullWritable, LongWritable, DoubleWritable>.Context context) throws IOException, InterruptedException {
            try {
                Configuration configuration = context.getConfiguration();
                Stressor stressor = (Stressor) context.getConfiguration().getClass(StressTestUtils.STRESSOR_CLASS, StressTestUtils.DEFAULT_STRESSOR_CLASS, Stressor.class).newInstance();
                stressor.configure(context.getConfiguration());
                RateComputingLimiterContainer rateComputingLimiterContainer = new RateComputingLimiterContainer();
                Limiter decorateLimiter = rateComputingLimiterContainer.decorateLimiter(MRStressTest.createLimiter(configuration, this.broker));
                ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
                ScheduledFuture<?> scheduleAtFixedRate = newSingleThreadScheduledExecutor.scheduleAtFixedRate(new Recorder(rateComputingLimiterContainer, context, true), 0L, 15L, TimeUnit.SECONDS);
                decorateLimiter.start();
                stressor.run(decorateLimiter);
                decorateLimiter.stop();
                scheduleAtFixedRate.cancel(false);
                ExecutorsUtils.shutdownExecutorService(newSingleThreadScheduledExecutor, Optional.absent(), 10L, TimeUnit.SECONDS);
            } catch (ReflectiveOperationException e) {
                throw new IOException(e);
            }
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((Text) obj, (NullWritable) obj2, (Mapper<Text, NullWritable, LongWritable, DoubleWritable>.Context) context);
        }
    }

    public static void main(String[] strArr) throws Exception {
        CommandLine parseCommandLine = StressTestUtils.parseCommandLine(OPTIONS, strArr);
        Configuration configuration = new Configuration();
        if (parseCommandLine.hasOption(THROTTLING_SERVER_URI.getOpt())) {
            configuration.setBoolean(USE_THROTTLING_SERVER, true);
            configuration.set(RESOURCE_ID, parseCommandLine.getOptionValue(RESOURCE_ID_OPT.getOpt(), "MRStressTest"));
            configuration.set(BrokerConfigurationKeyGenerator.generateKey(new SharedRestClientFactory(), new SharedRestClientKey(RestliLimiterFactory.RESTLI_SERVICE_NAME), (ScopeType) null, "serverUri"), parseCommandLine.getOptionValue(THROTTLING_SERVER_URI.getOpt()));
        }
        if (parseCommandLine.hasOption(LOCAL_QPS_OPT.getOpt())) {
            configuration.set(LOCALLY_ENFORCED_QPS, parseCommandLine.getOptionValue(LOCAL_QPS_OPT.getOpt()));
        }
        Job job = Job.getInstance(configuration, "ThrottlingStressTest");
        job.getConfiguration().setBoolean("mapreduce.job.user.classpath.first", true);
        job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
        job.getConfiguration().set(NUM_MAPPERS, parseCommandLine.getOptionValue(NUM_MAPPERS_OPT.getOpt(), DEFAULT_MAPPERS));
        StressTestUtils.populateConfigFromCli(job.getConfiguration(), parseCommandLine);
        job.setJarByClass(MRStressTest.class);
        job.setMapperClass(StresserMapper.class);
        job.setReducerClass(AggregatorReducer.class);
        job.setInputFormatClass(MyInputFormat.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(DoubleWritable.class);
        FileOutputFormat.setOutputPath(job, new Path("/tmp/MRStressTest" + System.currentTimeMillis()));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    static Limiter createLimiter(Configuration configuration, SharedResourcesBroker<SimpleScopeType> sharedResourcesBroker) {
        try {
            Limiter noopLimiter = new NoopLimiter();
            long j = configuration.getLong(LOCALLY_ENFORCED_QPS, 0L);
            if (j > 0) {
                log.info("Setting up local qps " + j);
                noopLimiter = new MultiLimiter(new Limiter[]{noopLimiter, new RateBasedLimiter(j)});
            }
            if (configuration.getBoolean(USE_THROTTLING_SERVER, false)) {
                log.info("Setting up remote throttling.");
                noopLimiter = new MultiLimiter(new Limiter[]{noopLimiter, (Limiter) sharedResourcesBroker.getSharedResource(new RestliLimiterFactory(), new SharedLimiterKey(configuration.get(RESOURCE_ID)))});
            }
            return noopLimiter;
        } catch (NotConfiguredException e) {
            throw new RuntimeException((Throwable) e);
        }
    }
}
