package com.google.cloud.dataflow.sdk.transforms;

import com.google.api.client.util.Throwables;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.RateLimiter;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/transforms/RateLimiting.class */
public class RateLimiting {

    @VisibleForTesting
    static final int DEFAULT_MAX_PARALLELISM = 16;

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/transforms/RateLimiting$RateLimitingDoFn.class */
    public static class RateLimitingDoFn<I, O> extends DoFn<I, O> {
        private static final Logger LOG = LoggerFactory.getLogger(RateLimitingDoFn.class);
        private final DoFn<I, O> doFn;
        private double rate;
        private int maxParallelism;
        private transient RateLimiter limiter;
        private transient ExecutorService executor;
        private transient Semaphore workTickets;
        private transient AtomicReference<Throwable> failure;

        /* loaded from: input_file:com/google/cloud/dataflow/sdk/transforms/RateLimiting$RateLimitingDoFn$WrappedContext.class */
        private class WrappedContext extends DoFn<I, O>.ProcessContext {
            private final DoFn<I, O>.ProcessContext context;

            WrappedContext(DoFn<I, O>.ProcessContext processContext) {
                super();
                this.context = processContext;
            }

            @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext
            public I element() {
                return this.context.element();
            }

            @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext
            public DoFn.KeyedState keyedState() {
                return this.context.keyedState();
            }

            @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
            public PipelineOptions getPipelineOptions() {
                return this.context.getPipelineOptions();
            }

            @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
            public <T> T sideInput(PCollectionView<T, ?> pCollectionView) {
                return (T) this.context.sideInput(pCollectionView);
            }

            @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
            public void output(O o) {
                synchronized (RateLimitingDoFn.this) {
                    this.context.output(o);
                }
            }

            @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
            public void outputWithTimestamp(O o, Instant instant) {
                synchronized (RateLimitingDoFn.this) {
                    this.context.outputWithTimestamp(o, instant);
                }
            }

            @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
            public <T> void sideOutput(TupleTag<T> tupleTag, T t) {
                synchronized (RateLimitingDoFn.this) {
                    this.context.sideOutput(tupleTag, t);
                }
            }

            @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
            public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) {
                synchronized (RateLimitingDoFn.this) {
                    this.context.sideOutputWithTimestamp(tupleTag, t, instant);
                }
            }

            @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
            public <AI, AA, AO> Aggregator<AI> createAggregator(String str, Combine.CombineFn<? super AI, AA, AO> combineFn) {
                return this.context.createAggregator(str, combineFn);
            }

            @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
            public <AI, AO> Aggregator<AI> createAggregator(String str, SerializableFunction<Iterable<AI>, AO> serializableFunction) {
                return this.context.createAggregator(str, serializableFunction);
            }

            @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext
            public Instant timestamp() {
                return this.context.timestamp();
            }

            @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext
            public Collection<? extends BoundedWindow> windows() {
                return this.context.windows();
            }
        }

        public RateLimitingDoFn(DoFn<I, O> doFn, double d, int i) {
            this.doFn = doFn;
            this.rate = d;
            this.maxParallelism = i;
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn
        public void startBundle(DoFn<I, O>.Context context) throws Exception {
            this.doFn.startBundle(context);
            if (this.rate > 0.0d) {
                this.limiter = RateLimiter.create(this.rate);
            }
            this.executor = Executors.newCachedThreadPool();
            this.workTickets = new Semaphore(this.maxParallelism);
            this.failure = new AtomicReference<>();
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn
        public void processElement(final DoFn<I, O>.ProcessContext processContext) throws Exception {
            if (this.limiter != null) {
                this.limiter.acquire();
            }
            try {
                this.workTickets.acquire();
                if (this.failure.get() != null) {
                    throw Throwables.propagate(this.failure.get());
                }
                this.executor.submit(new Runnable() { // from class: com.google.cloud.dataflow.sdk.transforms.RateLimiting.RateLimitingDoFn.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            try {
                                RateLimitingDoFn.this.doFn.processElement(new WrappedContext(processContext));
                                RateLimitingDoFn.this.workTickets.release();
                            } catch (Throwable th) {
                                RateLimitingDoFn.this.failure.compareAndSet(null, th);
                                Throwables.propagateIfPossible(th);
                                String valueOf = String.valueOf(String.valueOf(th));
                                throw new AssertionError(new StringBuilder(30 + valueOf.length()).append("Unexpected checked exception: ").append(valueOf).toString());
                            }
                        } catch (Throwable th2) {
                            RateLimitingDoFn.this.workTickets.release();
                            throw th2;
                        }
                    }
                });
            } catch (InterruptedException e) {
                throw new RuntimeException("Interrupted while scheduling work", e);
            }
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn
        public void finishBundle(DoFn<I, O>.Context context) throws Exception {
            this.executor.shutdown();
            while (!this.executor.awaitTermination(30L, TimeUnit.SECONDS)) {
                try {
                    LOG.info("RateLimitingDoFn backlog: {}", Integer.valueOf((this.workTickets.getQueueLength() + this.maxParallelism) - this.workTickets.availablePermits()));
                } catch (InterruptedException e) {
                    throw Throwables.propagate(e);
                }
            }
            if (this.failure.get() != null) {
                throw Throwables.propagate(this.failure.get());
            }
            this.doFn.finishBundle(context);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn
        public TypeToken<I> getInputTypeToken() {
            return this.doFn.getInputTypeToken();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn
        public TypeToken<O> getOutputTypeToken() {
            return this.doFn.getOutputTypeToken();
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/transforms/RateLimiting$RateLimitingTransform.class */
    public static class RateLimitingTransform<I, O> extends PTransform<PCollection<? extends I>, PCollection<O>> {
        private final DoFn<I, O> doFn;
        private double rate = 0.0d;
        private int maxParallelism = RateLimiting.DEFAULT_MAX_PARALLELISM;

        public RateLimitingTransform(DoFn<I, O> doFn) {
            this.doFn = doFn;
        }

        public RateLimitingTransform<I, O> withRateLimit(double d) {
            this.rate = d;
            return this;
        }

        public RateLimitingTransform<I, O> withMaxParallelism(int i) {
            this.maxParallelism = i;
            return this;
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.PTransform
        public PCollection<O> apply(PCollection<? extends I> pCollection) {
            return (PCollection) pCollection.apply(ParDo.of(new RateLimitingDoFn(this.doFn, this.rate, this.maxParallelism)));
        }
    }

    public static <I, O> RateLimitingTransform<I, O> perWorker(DoFn<I, O> doFn) {
        return new RateLimitingTransform<>(doFn);
    }
}
