package org.apache.beam.io.requestresponse;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Optional;
import org.apache.beam.io.requestresponse.Caller;
import org.apache.beam.io.requestresponse.SetupTeardown;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteSource;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* JADX WARN: Incorrect field signature: TDecrementerT; */
/* JADX WARN: Incorrect field signature: TDequeuerT; */
/* JADX WARN: Incorrect field signature: TEnqueuerT; */
/* JADX WARN: Incorrect field signature: TRefresherT; */
/* JADX WARN: Incorrect field signature: TReporterT; */
/* loaded from: input_file:org/apache/beam/io/requestresponse/ThrottleWithExternalResource.class */
public class ThrottleWithExternalResource<T, ReporterT extends Caller<String, Long> & SetupTeardown, EnqueuerT extends Caller<T, Void> & SetupTeardown, DequeuerT extends Caller<Instant, T> & SetupTeardown, DecrementerT extends Caller<Instant, Long> & SetupTeardown, RefresherT extends Caller<Instant, Void> & SetupTeardown> extends PTransform<PCollection<T>, Result<T>> {
    private static final Duration THROTTLE_INTERVAL = Duration.standardSeconds(1);
    private final Quota quota;
    private final String quotaIdentifier;
    private final Coder<T> coder;
    private final Caller reporterT;
    private final Caller enqueuerT;
    private final Caller dequeuerT;
    private final Caller decrementerT;
    private final Caller refresherT;

    /* loaded from: input_file:org/apache/beam/io/requestresponse/ThrottleWithExternalResource$RedisDecrementer.class */
    private static class RedisDecrementer extends RedisSetupTeardown implements Caller<Instant, Long> {
        private final String key;

        private RedisDecrementer(URI uri, String str) {
            super(new RedisClient(uri));
            this.key = str;
        }

        @Override // org.apache.beam.io.requestresponse.Caller
        public Long call(Instant instant) throws UserCodeExecutionException {
            return Long.valueOf(this.client.decr(this.key));
        }
    }

    /* loaded from: input_file:org/apache/beam/io/requestresponse/ThrottleWithExternalResource$RedisDequeuer.class */
    private static class RedisDequeuer<T> extends RedisSetupTeardown implements Caller<Instant, T> {
        private final Coder<T> coder;
        private final String key;

        private RedisDequeuer(URI uri, Coder<T> coder, String str) {
            super(new RedisClient(uri));
            this.coder = coder;
            this.key = str;
        }

        @Override // org.apache.beam.io.requestresponse.Caller
        public T call(Instant instant) throws UserCodeExecutionException {
            try {
                return (T) Preconditions.checkStateNotNull(this.coder.decode(ByteSource.wrap(this.client.lpop(this.key)).openStream()));
            } catch (IOException e) {
                throw new UserCodeExecutionException(e);
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/io/requestresponse/ThrottleWithExternalResource$RedisEnqueuer.class */
    private static class RedisEnqueuer<T> extends RedisSetupTeardown implements Caller<T, Void> {
        private final String key;
        private final Coder<T> coder;

        private RedisEnqueuer(URI uri, String str, Coder<T> coder) {
            super(new RedisClient(uri));
            this.key = str;
            this.coder = coder;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r2v3, types: [byte[], byte[][]] */
        @Override // org.apache.beam.io.requestresponse.Caller
        public Void call(T t) throws UserCodeExecutionException {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                this.coder.encode(t, byteArrayOutputStream);
                this.client.rpush(this.key, new byte[]{byteArrayOutputStream.toByteArray()});
                return null;
            } catch (IOException e) {
                throw new UserCodeExecutionException(e);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.beam.io.requestresponse.Caller
        public /* bridge */ /* synthetic */ Void call(Object obj) throws UserCodeExecutionException {
            return call((RedisEnqueuer<T>) obj);
        }
    }

    /* loaded from: input_file:org/apache/beam/io/requestresponse/ThrottleWithExternalResource$RedisRefresher.class */
    private static class RedisRefresher extends RedisSetupTeardown implements Caller<Instant, Void> {
        private final Quota quota;
        private final String key;

        private RedisRefresher(URI uri, Quota quota, String str) {
            super(new RedisClient(uri));
            this.quota = quota;
            this.key = str;
        }

        @Override // org.apache.beam.io.requestresponse.Caller
        public Void call(Instant instant) throws UserCodeExecutionException {
            this.client.setex(this.key, Long.valueOf(this.quota.getNumRequests()), this.quota.getInterval());
            return null;
        }
    }

    /* loaded from: input_file:org/apache/beam/io/requestresponse/ThrottleWithExternalResource$RedisReporter.class */
    private static class RedisReporter extends RedisSetupTeardown implements Caller<String, Long> {
        private RedisReporter(URI uri) {
            super(new RedisClient(uri));
        }

        @Override // org.apache.beam.io.requestresponse.Caller
        public Long call(String str) throws UserCodeExecutionException {
            return Long.valueOf(this.client.getLong(str));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/io/requestresponse/ThrottleWithExternalResource$RedisSetupTeardown.class */
    public static abstract class RedisSetupTeardown implements SetupTeardown {
        protected final RedisClient client;

        private RedisSetupTeardown(RedisClient redisClient) {
            this.client = redisClient;
        }

        @Override // org.apache.beam.io.requestresponse.SetupTeardown
        public void setup() throws UserCodeExecutionException {
            this.client.setup();
        }

        @Override // org.apache.beam.io.requestresponse.SetupTeardown
        public void teardown() throws UserCodeExecutionException {
            this.client.teardown();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Incorrect field signature: TDecrementerT; */
    /* JADX WARN: Incorrect field signature: TDequeuerT; */
    /* JADX WARN: Incorrect field signature: TReporterT; */
    /* loaded from: input_file:org/apache/beam/io/requestresponse/ThrottleWithExternalResource$ThrottleFn.class */
    public class ThrottleFn extends DoFn<Instant, T> {
        private final String quotaIdentifier;
        private final Caller dequeuerT;
        private final Caller decrementerT;
        private final Caller reporterT;
        private final TupleTag<T> outputTag;
        private final TupleTag<ApiIOError> failureTag;

        /* JADX WARN: Multi-variable type inference failed */
        private ThrottleFn(String str, DequeuerT dequeuert, DecrementerT decrementert, ReporterT reportert, TupleTag<T> tupleTag, TupleTag<ApiIOError> tupleTag2) {
            this.quotaIdentifier = str;
            this.dequeuerT = dequeuert;
            this.decrementerT = decrementert;
            this.reporterT = reportert;
            this.outputTag = tupleTag;
            this.failureTag = tupleTag2;
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element Instant instant, DoFn.MultiOutputReceiver multiOutputReceiver) {
            try {
                if (((Long) this.reporterT.call(this.quotaIdentifier)).longValue() > 0 && ((Long) this.decrementerT.call(instant)).longValue() >= 0) {
                    multiOutputReceiver.get(this.outputTag).output(this.dequeuerT.call(instant));
                }
            } catch (UserCodeExecutionException e) {
                multiOutputReceiver.get(this.failureTag).output(ApiIOError.builder().setRequestAsJsonString("").setMessage((String) Optional.ofNullable(e.getMessage()).orElse("")).setObservedTimestamp(Instant.now()).setStackTrace(Throwables.getStackTraceAsString(e)).build());
            }
        }

        @DoFn.Setup
        public void setup() throws UserCodeExecutionException {
            ((SetupTeardown) ThrottleWithExternalResource.this.enqueuerT).setup();
            ((SetupTeardown) this.dequeuerT).setup();
            ((SetupTeardown) this.decrementerT).setup();
            ((SetupTeardown) this.reporterT).setup();
        }

        @DoFn.Teardown
        public void teardown() throws UserCodeExecutionException {
            ArrayList arrayList = new ArrayList();
            try {
                ((SetupTeardown) ThrottleWithExternalResource.this.enqueuerT).teardown();
            } catch (UserCodeExecutionException e) {
                arrayList.add(String.format("%s encountered error during teardown: %s", "enqueuerT", e));
            }
            try {
                ((SetupTeardown) this.dequeuerT).teardown();
            } catch (UserCodeExecutionException e2) {
                arrayList.add(String.format("%s encountered error during teardown: %s", "dequeuerT", e2));
            }
            try {
                ((SetupTeardown) this.decrementerT).teardown();
            } catch (UserCodeExecutionException e3) {
                arrayList.add(String.format("%s encountered error during teardown: %s", "decrementerT", e3));
            }
            try {
                ((SetupTeardown) this.reporterT).teardown();
            } catch (UserCodeExecutionException e4) {
                arrayList.add(String.format("%s encountered error during teardown: %s", "reporterT", e4));
            }
            if (!arrayList.isEmpty()) {
                throw new UserCodeExecutionException(String.join("; ", arrayList));
            }
        }
    }

    static <T> ThrottleWithExternalResource<T, RedisReporter, RedisEnqueuer<T>, RedisDequeuer<T>, RedisDecrementer, RedisRefresher> usingRedis(URI uri, String str, String str2, Quota quota, Coder<T> coder) throws Coder.NonDeterministicException {
        return new ThrottleWithExternalResource<>(quota, str, coder, new RedisReporter(uri), new RedisEnqueuer(uri, str2, coder), new RedisDequeuer(uri, coder, str2), new RedisDecrementer(uri, str2), new RedisRefresher(uri, quota, str));
    }

    /* JADX WARN: Incorrect types in method signature: (Lorg/apache/beam/io/requestresponse/Quota;Ljava/lang/String;Lorg/apache/beam/sdk/coders/Coder<TT;>;TReporterT;TEnqueuerT;TDequeuerT;TDecrementerT;TRefresherT;)V */
    ThrottleWithExternalResource(Quota quota, String str, Coder coder, Caller caller, Caller caller2, Caller caller3, Caller caller4, Caller caller5) throws Coder.NonDeterministicException {
        this.quotaIdentifier = str;
        this.reporterT = caller;
        coder.verifyDeterministic();
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument(!str.isEmpty());
        this.quota = quota;
        this.coder = coder;
        this.enqueuerT = caller2;
        this.dequeuerT = caller3;
        this.decrementerT = caller4;
        this.refresherT = caller5;
    }

    public Result<T> expand(PCollection<T> pCollection) {
        Pipeline pipeline = pCollection.getPipeline();
        Result result = (Result) pipeline.apply("quota impulse", PeriodicImpulse.create().withInterval(this.quota.getInterval())).apply("quota refresh", getRefresher());
        Result result2 = (Result) pCollection.apply("enqueue", getEnqueuer());
        TupleTag<T> tupleTag = new TupleTag<T>() { // from class: org.apache.beam.io.requestresponse.ThrottleWithExternalResource.1
        };
        TupleTag<ApiIOError> tupleTag2 = new TupleTag<ApiIOError>() { // from class: org.apache.beam.io.requestresponse.ThrottleWithExternalResource.2
        };
        PCollectionTuple apply = pipeline.apply("throttle impulse", PeriodicImpulse.create().withInterval(THROTTLE_INTERVAL)).apply("throttle fn", ParDo.of(new ThrottleFn(this.quotaIdentifier, this.dequeuerT, this.decrementerT, this.reporterT, tupleTag, tupleTag2)).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)));
        PCollection apply2 = PCollectionList.of(result.getFailures()).and(result2.getFailures()).and(apply.get(tupleTag2)).apply("errors flatten", Flatten.pCollections());
        TupleTag<T> tupleTag3 = new TupleTag<T>() { // from class: org.apache.beam.io.requestresponse.ThrottleWithExternalResource.3
        };
        TupleTag<ApiIOError> tupleTag4 = new TupleTag<ApiIOError>() { // from class: org.apache.beam.io.requestresponse.ThrottleWithExternalResource.4
        };
        return Result.of(this.coder, tupleTag3, tupleTag4, PCollectionTuple.of(tupleTag3, apply.get(tupleTag)).and(tupleTag4, apply2));
    }

    private Call<Instant, Void> getRefresher() {
        return Call.ofCallerAndSetupTeardown(this.refresherT, VoidCoder.of());
    }

    private Call<T, Void> getEnqueuer() {
        return Call.ofCallerAndSetupTeardown(this.enqueuerT, VoidCoder.of());
    }
}
