package org.apache.beam.sdk.io.synthetic;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
import org.apache.beam.sdk.io.synthetic.delay.SyntheticDelay;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.RateLimiter;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/beam/sdk/io/synthetic/SyntheticStep.class */
public class SyntheticStep extends DoFn<KV<byte[], byte[]>, KV<byte[], byte[]>> {
    private final Options options;
    private final KV<Long, Long> idAndThroughput;
    private final Counter throttlingCounter = Metrics.counter("dataflow-throttling-metrics", "throttling-msecs");
    private static LoadingCache<KV<Long, Long>, RateLimiter> rateLimiterCache = CacheBuilder.newBuilder().build(new CacheLoader<KV<Long, Long>, RateLimiter>() { // from class: org.apache.beam.sdk.io.synthetic.SyntheticStep.1
        public RateLimiter load(KV<Long, Long> kv) {
            return RateLimiter.create(((Long) kv.getValue()).doubleValue());
        }
    });

    /* loaded from: input_file:org/apache/beam/sdk/io/synthetic/SyntheticStep$Options.class */
    public static class Options extends SyntheticOptions {

        @JsonProperty
        public double outputRecordsPerInputRecord;

        @JsonProperty
        public boolean preservesInputKeyDistribution;

        @JsonProperty
        public long maxWorkerThroughput = -1;

        @JsonProperty
        public long perBundleDelay = 0;

        @JsonProperty
        public SyntheticOptions.DelayType perBundleDelayType = SyntheticOptions.DelayType.SLEEP;

        @JsonProperty
        public boolean reportThrottlingMicros;

        @Override // org.apache.beam.sdk.io.synthetic.SyntheticOptions
        public void validate() {
            super.validate();
            Preconditions.checkArgument(this.outputRecordsPerInputRecord >= 0.0d, "outputRecordsPerInputRecord should be a non-negative number, but found %s.", Double.valueOf(this.outputRecordsPerInputRecord));
            Preconditions.checkArgument(this.perBundleDelay >= 0, "perBundleDelay should be a non-negative number, but found %s.", this.perBundleDelay);
            if (this.maxWorkerThroughput >= 0) {
                Preconditions.checkArgument(this.perBundleDelay == 0, "maxWorkerThroughput and perBundleDelay cannot be enabled simultaneously.");
            }
        }
    }

    public SyntheticStep(Options options) {
        options.validate();
        this.options = options;
        this.idAndThroughput = KV.of(Long.valueOf(new Random().nextLong()), Long.valueOf(options.maxWorkerThroughput));
    }

    @DoFn.ProcessElement
    public void processElement(DoFn<KV<byte[], byte[]>, KV<byte[], byte[]>>.ProcessContext processContext) throws Exception {
        byte[] bArr = (byte[]) ((KV) processContext.element()).getKey();
        byte[] bArr2 = (byte[]) ((KV) processContext.element()).getValue();
        int i = (int) this.options.outputRecordsPerInputRecord;
        double d = this.options.outputRecordsPerInputRecord - i;
        long asLong = this.options.hashFunction().hashBytes(bArr2).asLong();
        Random random = new Random(asLong);
        int i2 = 0;
        while (i2 < i) {
            processContext.output(outputElement(bArr, bArr2, asLong, i2, random));
            i2++;
        }
        if (random.nextDouble() < d) {
            processContext.output(outputElement(bArr, bArr2, asLong, i2, random));
        }
    }

    private KV<byte[], byte[]> outputElement(byte[] bArr, byte[] bArr2, long j, int i, Random random) {
        long asLong = this.options.hashFunction().hashLong(j + i).asLong();
        long j2 = 0;
        for (Duration millis = Duration.millis(this.options.nextDelay(asLong)); millis.getMillis() > 0; millis = Duration.millis(1L)) {
            j2 += SyntheticDelay.delay(millis, this.options.cpuUtilizationInMixedDelay, this.options.delayType, random);
            if (isWithinThroughputLimit()) {
                break;
            }
        }
        reportThrottlingTimeMetrics(j2);
        if (!this.options.preservesInputKeyDistribution) {
            return this.options.genKvPair(asLong);
        }
        byte[] bArr3 = new byte[bArr2.length];
        random.nextBytes(bArr3);
        return KV.of(bArr, bArr3);
    }

    private void reportThrottlingTimeMetrics(long j) {
        if (!this.options.reportThrottlingMicros || j <= 0) {
            return;
        }
        this.throttlingCounter.inc(TimeUnit.MILLISECONDS.toMicros(j));
    }

    private boolean isWithinThroughputLimit() {
        return this.options.maxWorkerThroughput < 0 || ((RateLimiter) rateLimiterCache.getUnchecked(this.idAndThroughput)).tryAcquire();
    }

    @DoFn.StartBundle
    public void startBundle() throws Exception {
        if (this.options.perBundleDelay > 0) {
            SyntheticDelay.delay(Duration.millis(this.options.perBundleDelay), this.options.cpuUtilizationInMixedDelay, this.options.perBundleDelayType, new Random());
        }
    }
}
