package org.apache.beam.sdk.io.gcp.datastore;

import java.io.IOException;
import java.io.Serializable;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.MovingFunction;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFn.class */
public class RampupThrottlingFn<T> extends DoFn<T, T> implements Serializable {
    private static final double BASE_BUDGET = 500.0d;
    private final ValueProvider<Integer> numWorkers;
    private final PCollectionView<Instant> firstInstantSideInput;

    @VisibleForTesting
    Counter throttlingMsecs;
    private transient MovingFunction successfulOps;

    @VisibleForTesting
    transient Sleeper sleeper;
    private static final Logger LOG = LoggerFactory.getLogger(RampupThrottlingFn.class);
    private static final Duration RAMP_UP_INTERVAL = Duration.standardMinutes(5);
    private static final FluentBackoff fluentBackoff = FluentBackoff.DEFAULT;

    public RampupThrottlingFn(ValueProvider<Integer> valueProvider, PCollectionView<Instant> pCollectionView) {
        this.throttlingMsecs = Metrics.counter(RampupThrottlingFn.class, "throttling-msecs");
        this.numWorkers = valueProvider;
        this.sleeper = Sleeper.DEFAULT;
        this.successfulOps = new MovingFunction(Duration.standardSeconds(1L).getMillis(), Duration.standardSeconds(1L).getMillis(), 1, 1, Sum.ofLongs());
        this.firstInstantSideInput = pCollectionView;
    }

    public RampupThrottlingFn(int i, PCollectionView<Instant> pCollectionView) {
        this((ValueProvider<Integer>) ValueProvider.StaticValueProvider.of(Integer.valueOf(i)), pCollectionView);
    }

    private int calcMaxOpsBudget(Instant instant, Instant instant2, int i) {
        double standardMinutes = RAMP_UP_INTERVAL.getStandardMinutes();
        return (int) Math.min(2.147483647E9d, Math.max(1.0d, (BASE_BUDGET / i) * Math.pow(1.5d, Math.max(0.0d, (new Duration(instant, instant2).getStandardMinutes() - standardMinutes) / standardMinutes))));
    }

    @DoFn.Setup
    public void setup() {
        this.sleeper = Sleeper.DEFAULT;
        this.successfulOps = new MovingFunction(Duration.standardSeconds(1L).getMillis(), Duration.standardSeconds(1L).getMillis(), 1, 1, Sum.ofLongs());
    }

    @DoFn.ProcessElement
    public void processElement(DoFn<T, T>.ProcessContext processContext) throws IOException, InterruptedException {
        Instant now;
        Instant instant = (Instant) processContext.sideInput(this.firstInstantSideInput);
        Object element = processContext.element();
        BackOff backoff = fluentBackoff.backoff();
        while (true) {
            now = Instant.now();
            int calcMaxOpsBudget = calcMaxOpsBudget(instant, now, ((Integer) this.numWorkers.get()).intValue());
            long j = calcMaxOpsBudget - this.successfulOps.get(now.getMillis());
            if (calcMaxOpsBudget >= Integer.MAX_VALUE || j > 0) {
                break;
            }
            long nextBackOffMillis = backoff.nextBackOffMillis();
            LOG.info("Delaying by {}ms to conform to gradual ramp-up.", Long.valueOf(nextBackOffMillis));
            this.throttlingMsecs.inc(nextBackOffMillis);
            this.sleeper.sleep(nextBackOffMillis);
        }
        processContext.output(element);
        this.successfulOps.add(now.getMillis(), 1L);
    }

    public void populateDisplayData(DisplayData.Builder builder) {
        builder.add(DisplayData.item("hintNumWorkers", this.numWorkers).withLabel("Number of workers for ramp-up throttling algorithm"));
    }
}
