package org.apache.beam.io.requestresponse;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.beam.io.requestresponse.AutoValue_Call_Configuration;
import org.apache.beam.io.requestresponse.Repeater;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/io/requestresponse/Call.class */
public class Call<RequestT, ResponseT> extends PTransform<PCollection<RequestT>, Result<ResponseT>> {
    private final TupleTag<ResponseT> responseTag = new TupleTag<ResponseT>() { // from class: org.apache.beam.io.requestresponse.Call.1
    };
    private final TupleTag<ApiIOError> failureTag = new TupleTag<ApiIOError>() { // from class: org.apache.beam.io.requestresponse.Call.2
    };
    private final Configuration<RequestT, ResponseT> configuration;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/io/requestresponse/Call$CallFn.class */
    public static class CallFn<RequestT, ResponseT> extends DoFn<RequestT, ResponseT> {
        private final TupleTag<ResponseT> responseTag;
        private final TupleTag<ApiIOError> failureTag;
        private final CallerWithTimeout<RequestT, ResponseT> caller;
        private final SetupTeardownWithTimeout setupTeardown;
        private final Configuration<RequestT, ResponseT> configuration;
        private Counter requestsCounter;
        private Counter responsesCounter;
        private Counter failuresCounter;
        private Counter callCounter;
        private Counter setupCounter;
        private Counter teardownCounter;
        private Counter backoffCounter;
        private Counter sleeperCounter;
        private Counter shouldBackoffCounter;
        private transient ExecutorService executor;

        private CallFn(TupleTag<ResponseT> tupleTag, TupleTag<ApiIOError> tupleTag2, Configuration<RequestT, ResponseT> configuration) {
            this.requestsCounter = null;
            this.responsesCounter = null;
            this.failuresCounter = null;
            this.callCounter = null;
            this.setupCounter = null;
            this.teardownCounter = null;
            this.backoffCounter = null;
            this.sleeperCounter = null;
            this.shouldBackoffCounter = null;
            this.responseTag = tupleTag;
            this.failureTag = tupleTag2;
            this.caller = new CallerWithTimeout<>(configuration.getTimeout(), configuration.getCaller());
            this.setupTeardown = new SetupTeardownWithTimeout(configuration.getTimeout(), configuration.getSetupTeardown());
            this.configuration = configuration;
        }

        private void setupMetrics() {
            Monitoring monitoringConfiguration = this.configuration.getMonitoringConfiguration();
            if (monitoringConfiguration.getCountRequests().booleanValue()) {
                this.requestsCounter = Metrics.counter(Call.class, "requests");
            }
            if (monitoringConfiguration.getCountResponses().booleanValue()) {
                this.responsesCounter = Metrics.counter(Call.class, "responses");
            }
            if (monitoringConfiguration.getCountFailures().booleanValue()) {
                this.failuresCounter = Metrics.counter(Call.class, "failures");
            }
            if (monitoringConfiguration.getCountCalls().booleanValue()) {
                this.callCounter = Metrics.counter(Call.class, Monitoring.callCounterNameOf(this.configuration.getCaller()));
            }
            if (monitoringConfiguration.getCountSetup().booleanValue()) {
                this.setupCounter = Metrics.counter(Call.class, Monitoring.setupCounterNameOf(this.configuration.getSetupTeardown()));
            }
            if (monitoringConfiguration.getCountTeardown().booleanValue()) {
                this.teardownCounter = Metrics.counter(Call.class, Monitoring.teardownCounterNameOf(this.configuration.getSetupTeardown()));
            }
            if (monitoringConfiguration.getCountBackoffs().booleanValue()) {
                this.backoffCounter = Metrics.counter(Call.class, Monitoring.backoffCounterNameOf(this.configuration.getBackOffSupplier().get()));
            }
            if (monitoringConfiguration.getCountSleeps().booleanValue()) {
                this.sleeperCounter = Metrics.counter(Call.class, Monitoring.sleeperCounterNameOf(this.configuration.getSleeperSupplier().get()));
            }
            if (monitoringConfiguration.getCountShouldBackoff().booleanValue()) {
                this.shouldBackoffCounter = Metrics.counter(Call.class, Monitoring.shouldBackoffCounterName(this.configuration.getCallShouldBackoff()));
            }
        }

        private void setupWithoutRepeat() throws UserCodeExecutionException {
            Monitoring.incIfPresent(this.setupCounter);
            this.setupTeardown.setup();
        }

        @DoFn.Setup
        public void setup() throws UserCodeExecutionException {
            setupMetrics();
            this.executor = Executors.newSingleThreadExecutor();
            this.caller.setExecutor(this.executor);
            this.setupTeardown.setExecutor(this.executor);
            if (!this.configuration.getShouldRepeat().booleanValue()) {
                setupWithoutRepeat();
                return;
            }
            BackOff backOff = this.configuration.getBackOffSupplier().get();
            Sleeper sleeper = this.configuration.getSleeperSupplier().get();
            backoffIfNeeded(backOff, sleeper);
            Repeater.builder().setBackOff(backOff).setSleeper(sleeper).setThrowableFunction(r3 -> {
                Monitoring.incIfPresent(this.setupCounter);
                this.setupTeardown.setup();
                return null;
            }).build().withBackoffCounter(this.backoffCounter).withSleeperCounter(this.sleeperCounter).apply(null);
        }

        @DoFn.Teardown
        public void teardown() throws UserCodeExecutionException {
            BackOff backOff = this.configuration.getBackOffSupplier().get();
            Sleeper sleeper = this.configuration.getSleeperSupplier().get();
            backoffIfNeeded(backOff, sleeper);
            if (!this.configuration.getShouldRepeat().booleanValue()) {
                Monitoring.incIfPresent(this.teardownCounter);
                this.setupTeardown.teardown();
            } else {
                Repeater.builder().setBackOff(backOff).setSleeper(sleeper).setThrowableFunction(r3 -> {
                    Monitoring.incIfPresent(this.teardownCounter);
                    this.setupTeardown.teardown();
                    return null;
                }).build().withBackoffCounter(this.backoffCounter).withSleeperCounter(this.sleeperCounter).apply(null);
                ((ExecutorService) Preconditions.checkStateNotNull(this.executor)).shutdown();
                try {
                    this.executor.awaitTermination(3L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                }
            }
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element RequestT requestt, DoFn.MultiOutputReceiver multiOutputReceiver) {
            BackOff backOff = this.configuration.getBackOffSupplier().get();
            Sleeper sleeper = this.configuration.getSleeperSupplier().get();
            Monitoring.incIfPresent(this.requestsCounter);
            backoffIfNeeded(backOff, sleeper);
            if (!this.configuration.getShouldRepeat().booleanValue()) {
                Monitoring.incIfPresent(this.callCounter);
                try {
                    multiOutputReceiver.get(this.responseTag).output(this.caller.call(requestt));
                    Monitoring.incIfPresent(this.responsesCounter);
                    return;
                } catch (UserCodeExecutionException e) {
                    Monitoring.incIfPresent(this.failuresCounter);
                    multiOutputReceiver.get(this.failureTag).output(ApiIOError.of(e, requestt));
                    return;
                }
            }
            Repeater.Builder backOff2 = Repeater.builder().setSleeper(sleeper).setBackOff(backOff);
            CallerWithTimeout<RequestT, ResponseT> callerWithTimeout = this.caller;
            Objects.requireNonNull(callerWithTimeout);
            try {
                multiOutputReceiver.get(this.responseTag).output(backOff2.setThrowableFunction(callerWithTimeout::call).build().withSleeperCounter(this.sleeperCounter).withBackoffCounter(this.backoffCounter).withCallCounter(this.callCounter).apply(requestt));
                Monitoring.incIfPresent(this.responsesCounter);
            } catch (UserCodeExecutionException e2) {
                Monitoring.incIfPresent(this.failuresCounter);
                multiOutputReceiver.get(this.failureTag).output(ApiIOError.of(e2, requestt));
            }
        }

        private void backoffIfNeeded(BackOff backOff, Sleeper sleeper) {
            if (this.configuration.getCallShouldBackoff().isTrue()) {
                Monitoring.incIfPresent(this.shouldBackoffCounter);
                Monitoring.incIfPresent(this.backoffCounter);
                try {
                    Monitoring.incIfPresent(this.sleeperCounter);
                    sleeper.sleep(backOff.nextBackOffMillis());
                } catch (IOException e) {
                    throw new RuntimeException(e);
                } catch (InterruptedException e2) {
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/io/requestresponse/Call$CallerWithTimeout.class */
    private static class CallerWithTimeout<RequestT, ResponseT> implements Caller<RequestT, ResponseT> {
        private final Duration timeout;
        private final Caller<RequestT, ResponseT> caller;
        private ExecutorService executor;

        private CallerWithTimeout(Duration duration, Caller<RequestT, ResponseT> caller) {
            this.timeout = duration;
            this.caller = caller;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setExecutor(ExecutorService executorService) {
            this.executor = executorService;
        }

        @Override // org.apache.beam.io.requestresponse.Caller
        public ResponseT call(RequestT requestt) throws UserCodeExecutionException {
            Future submit = ((ExecutorService) Preconditions.checkStateNotNull(this.executor)).submit(() -> {
                return this.caller.call(requestt);
            });
            try {
                return (ResponseT) submit.get(this.timeout.getMillis(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException | TimeoutException e) {
                throw new UserCodeTimeoutException(e);
            } catch (ExecutionException e2) {
                Call.parseAndThrow(submit, e2);
                throw new UserCodeExecutionException("could not complete request");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/io/requestresponse/Call$Configuration.class */
    public static abstract class Configuration<RequestT, ResponseT> implements Serializable {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/io/requestresponse/Call$Configuration$Builder.class */
        public static abstract class Builder<RequestT, ResponseT> {
            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract Builder<RequestT, ResponseT> setCaller(Caller<RequestT, ResponseT> caller);

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract Builder<RequestT, ResponseT> setResponseCoder(Coder<ResponseT> coder);

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract Builder<RequestT, ResponseT> setSetupTeardown(SetupTeardown setupTeardown);

            abstract Optional<SetupTeardown> getSetupTeardown();

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract Builder<RequestT, ResponseT> setTimeout(Duration duration);

            abstract Optional<Duration> getTimeout();

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract Builder<RequestT, ResponseT> setShouldRepeat(Boolean bool);

            abstract Optional<Boolean> getShouldRepeat();

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract Builder<RequestT, ResponseT> setCallShouldBackoff(CallShouldBackoff<ResponseT> callShouldBackoff);

            abstract Optional<CallShouldBackoff<ResponseT>> getCallShouldBackoff();

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract Builder<RequestT, ResponseT> setSleeperSupplier(SerializableSupplier<Sleeper> serializableSupplier);

            abstract Optional<SerializableSupplier<Sleeper>> getSleeperSupplier();

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract Builder<RequestT, ResponseT> setBackOffSupplier(SerializableSupplier<BackOff> serializableSupplier);

            abstract Optional<SerializableSupplier<BackOff>> getBackOffSupplier();

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract Builder<RequestT, ResponseT> setMonitoringConfiguration(Monitoring monitoring);

            abstract Optional<Monitoring> getMonitoringConfiguration();

            abstract Configuration<RequestT, ResponseT> autoBuild();

            /* JADX INFO: Access modifiers changed from: package-private */
            public final Configuration<RequestT, ResponseT> build() {
                if (!getSetupTeardown().isPresent()) {
                    setSetupTeardown(new NoopSetupTeardown());
                }
                if (!getShouldRepeat().isPresent()) {
                    setShouldRepeat(false);
                }
                if (!getTimeout().isPresent()) {
                    setTimeout(RequestResponseIO.DEFAULT_TIMEOUT);
                }
                if (!getCallShouldBackoff().isPresent()) {
                    setCallShouldBackoff(new NoopCallShouldBackoff());
                }
                if (!getSleeperSupplier().isPresent()) {
                    setSleeperSupplier(() -> {
                        return Sleeper.DEFAULT;
                    });
                }
                if (!getBackOffSupplier().isPresent()) {
                    setBackOffSupplier(new DefaultSerializableBackoffSupplier());
                }
                if (!getMonitoringConfiguration().isPresent()) {
                    setMonitoringConfiguration(Monitoring.builder().build());
                }
                return autoBuild();
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case 127933828:
                        if (implMethodName.equals("lambda$build$438d7f7a$1")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/io/requestresponse/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/io/requestresponse/Call$Configuration$Builder") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/beam/sdk/util/Sleeper;")) {
                            return () -> {
                                return Sleeper.DEFAULT;
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static <RequestT, ResponseT> Builder<RequestT, ResponseT> builder() {
            return new AutoValue_Call_Configuration.Builder();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Caller<RequestT, ResponseT> getCaller();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SetupTeardown getSetupTeardown();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Duration getTimeout();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Coder<ResponseT> getResponseCoder();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Boolean getShouldRepeat();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract CallShouldBackoff<ResponseT> getCallShouldBackoff();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SerializableSupplier<Sleeper> getSleeperSupplier();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SerializableSupplier<BackOff> getBackOffSupplier();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Monitoring getMonitoringConfiguration();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Builder<RequestT, ResponseT> toBuilder();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/io/requestresponse/Call$NoopCallShouldBackoff.class */
    public static class NoopCallShouldBackoff<ResponseT> implements CallShouldBackoff<ResponseT> {
        private NoopCallShouldBackoff() {
        }

        @Override // org.apache.beam.io.requestresponse.CallShouldBackoff
        public void update(UserCodeExecutionException userCodeExecutionException) {
        }

        @Override // org.apache.beam.io.requestresponse.CallShouldBackoff
        public void update(ResponseT responset) {
        }

        @Override // org.apache.beam.io.requestresponse.CallShouldBackoff
        public boolean isTrue() {
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/io/requestresponse/Call$NoopSetupTeardown.class */
    public static class NoopSetupTeardown implements SetupTeardown {
        NoopSetupTeardown() {
        }

        @Override // org.apache.beam.io.requestresponse.SetupTeardown
        public void setup() throws UserCodeExecutionException {
        }

        @Override // org.apache.beam.io.requestresponse.SetupTeardown
        public void teardown() throws UserCodeExecutionException {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/io/requestresponse/Call$SetupTeardownWithTimeout.class */
    public static class SetupTeardownWithTimeout implements SetupTeardown {
        private final Duration timeout;
        private final SetupTeardown setupTeardown;
        private ExecutorService executor;

        SetupTeardownWithTimeout(Duration duration, SetupTeardown setupTeardown) {
            this.timeout = duration;
            this.setupTeardown = setupTeardown;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setExecutor(ExecutorService executorService) {
            this.executor = executorService;
        }

        @Override // org.apache.beam.io.requestresponse.SetupTeardown
        public void setup() throws UserCodeExecutionException {
            executeAsync(() -> {
                this.setupTeardown.setup();
                return null;
            });
        }

        @Override // org.apache.beam.io.requestresponse.SetupTeardown
        public void teardown() throws UserCodeExecutionException {
            executeAsync(() -> {
                this.setupTeardown.teardown();
                return null;
            });
        }

        private void executeAsync(Callable<Void> callable) throws UserCodeExecutionException {
            Future submit = ((ExecutorService) Preconditions.checkStateNotNull(this.executor)).submit(callable);
            try {
                submit.get(this.timeout.getMillis(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException | TimeoutException e) {
                submit.cancel(true);
                throw new UserCodeTimeoutException(e);
            } catch (ExecutionException e2) {
                Call.parseAndThrow(submit, e2);
            }
        }
    }

    static <RequestT, ResponseT> Call<RequestT, ResponseT> of(Caller<RequestT, ResponseT> caller, Coder<ResponseT> coder) {
        return new Call<>(Configuration.builder().setCaller((Caller) SerializableUtils.ensureSerializable(caller)).setResponseCoder(coder).build());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Incorrect types in method signature: <RequestT:Ljava/lang/Object;ResponseT:Ljava/lang/Object;CallerSetupTeardownT::Lorg/apache/beam/io/requestresponse/Caller<TRequestT;TResponseT;>;:Lorg/apache/beam/io/requestresponse/SetupTeardown;>(TCallerSetupTeardownT;Lorg/apache/beam/sdk/coders/Coder<TResponseT;>;)Lorg/apache/beam/io/requestresponse/Call<TRequestT;TResponseT;>; */
    public static Call ofCallerAndSetupTeardown(Caller caller, Coder coder) {
        Caller<RequestT, ResponseT> caller2 = (Caller) SerializableUtils.ensureSerializable(caller);
        return new Call(Configuration.builder().setCaller(caller2).setResponseCoder(coder).setSetupTeardown((SetupTeardown) caller2).build());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <RequestT, ResponseT> Call<RequestT, ResponseT> of(Configuration<RequestT, ResponseT> configuration) {
        return new Call<>(configuration);
    }

    private Call(Configuration<RequestT, ResponseT> configuration) {
        this.configuration = (Configuration) SerializableUtils.ensureSerializable(configuration);
    }

    Call<RequestT, ResponseT> withSetupTeardown(SetupTeardown setupTeardown) {
        return new Call<>(this.configuration.toBuilder().setSetupTeardown((SetupTeardown) SerializableUtils.ensureSerializable(setupTeardown)).build());
    }

    Call<RequestT, ResponseT> withTimeout(Duration duration) {
        return new Call<>(this.configuration.toBuilder().setTimeout(duration).build());
    }

    public Result<ResponseT> expand(PCollection<RequestT> pCollection) {
        return Result.of(this.configuration.getResponseCoder(), this.responseTag, this.failureTag, pCollection.apply(CallFn.class.getSimpleName(), ParDo.of(new CallFn(this.responseTag, this.failureTag, this.configuration)).withOutputTags(this.responseTag, TupleTagList.of(this.failureTag))));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> void parseAndThrow(Future<T> future, ExecutionException executionException) throws UserCodeExecutionException {
        future.cancel(true);
        if (executionException.getCause() == null) {
            throw new UserCodeExecutionException(executionException);
        }
        Throwable th = (Throwable) Preconditions.checkStateNotNull(executionException.getCause());
        if (!(th instanceof UserCodeQuotaException)) {
            throw new UserCodeExecutionException(th);
        }
        throw new UserCodeQuotaException(th);
    }
}
