package org.apache.beam.io.requestresponse;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteSource;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/beam/io/requestresponse/Cache.class */
public final class Cache {

    /* loaded from: input_file:org/apache/beam/io/requestresponse/Cache$CacheResponseCoder.class */
    private static class CacheResponseCoder<ResponseT> extends CustomCoder<ResponseT> {
        private final NullableCoder<ResponseT> basis;

        private CacheResponseCoder(Coder<ResponseT> coder) {
            this.basis = NullableCoder.of(coder);
        }

        public void encode(ResponseT responset, OutputStream outputStream) throws CoderException, IOException {
            this.basis.encode(responset, outputStream);
        }

        public ResponseT decode(InputStream inputStream) throws CoderException, IOException {
            return (ResponseT) this.basis.decode(inputStream);
        }

        public List<? extends Coder<?>> getCoderArguments() {
            return this.basis.getCoderArguments();
        }

        public void verifyDeterministic() throws Coder.NonDeterministicException {
            this.basis.verifyDeterministic();
        }
    }

    /* loaded from: input_file:org/apache/beam/io/requestresponse/Cache$Pair.class */
    public static class Pair<RequestT, ResponseT> {
        private final PTransform<PCollection<RequestT>, Result<KV<RequestT, ResponseT>>> read;
        private final PTransform<PCollection<KV<RequestT, ResponseT>>, Result<KV<RequestT, ResponseT>>> write;

        public static <RequestT, ResponseT> Pair<RequestT, ResponseT> of(PTransform<PCollection<RequestT>, Result<KV<RequestT, ResponseT>>> pTransform, PTransform<PCollection<KV<RequestT, ResponseT>>, Result<KV<RequestT, ResponseT>>> pTransform2) {
            return new Pair<>(pTransform, pTransform2);
        }

        private Pair(PTransform<PCollection<RequestT>, Result<KV<RequestT, ResponseT>>> pTransform, PTransform<PCollection<KV<RequestT, ResponseT>>, Result<KV<RequestT, ResponseT>>> pTransform2) {
            this.read = pTransform;
            this.write = pTransform2;
        }

        public PTransform<PCollection<RequestT>, Result<KV<RequestT, ResponseT>>> getRead() {
            return this.read;
        }

        public PTransform<PCollection<KV<RequestT, ResponseT>>, Result<KV<RequestT, ResponseT>>> getWrite() {
            return this.write;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/io/requestresponse/Cache$UsingRedis.class */
    public static class UsingRedis<RequestT, ResponseT> {
        private final Coder<RequestT> requestTCoder;
        private final Coder<ResponseT> responseTCoder;
        private final RedisClient client;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/io/requestresponse/Cache$UsingRedis$Read.class */
        public static class Read<RequestT, ResponseT> implements Caller<RequestT, KV<RequestT, ResponseT>>, SetupTeardown {
            private final Coder<RequestT> requestTCoder;
            private final Coder<ResponseT> responseTCoder;
            private final RedisClient client;

            private Read(Coder<RequestT> coder, Coder<ResponseT> coder2, RedisClient redisClient) {
                this.requestTCoder = coder;
                this.responseTCoder = coder2;
                this.client = redisClient;
            }

            @Override // org.apache.beam.io.requestresponse.Caller
            public KV<RequestT, ResponseT> call(RequestT requestt) throws UserCodeExecutionException {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                try {
                    this.requestTCoder.encode(requestt, byteArrayOutputStream);
                    byte[] bytes = this.client.getBytes(byteArrayOutputStream.toByteArray());
                    return bytes == null ? KV.of(requestt, (Object) null) : KV.of(requestt, Preconditions.checkStateNotNull(this.responseTCoder.decode(ByteSource.wrap(bytes).openStream())));
                } catch (IOException | IllegalStateException e) {
                    throw new UserCodeExecutionException(e);
                }
            }

            @Override // org.apache.beam.io.requestresponse.SetupTeardown
            public void setup() throws UserCodeExecutionException {
                this.client.setup();
            }

            @Override // org.apache.beam.io.requestresponse.SetupTeardown
            public void teardown() throws UserCodeExecutionException {
                this.client.teardown();
            }

            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.apache.beam.io.requestresponse.Caller
            public /* bridge */ /* synthetic */ Object call(Object obj) throws UserCodeExecutionException {
                return call((Read<RequestT, ResponseT>) obj);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/io/requestresponse/Cache$UsingRedis$Write.class */
        public static class Write<RequestT, ResponseT> implements Caller<KV<RequestT, ResponseT>, KV<RequestT, ResponseT>>, SetupTeardown {
            private final Duration expiry;
            private final Coder<RequestT> requestTCoder;
            private final Coder<ResponseT> responseTCoder;
            private final RedisClient client;

            private Write(Duration duration, Coder<RequestT> coder, Coder<ResponseT> coder2, RedisClient redisClient) {
                this.expiry = duration;
                this.requestTCoder = coder;
                this.responseTCoder = coder2;
                this.client = redisClient;
            }

            @Override // org.apache.beam.io.requestresponse.Caller
            public KV<RequestT, ResponseT> call(KV<RequestT, ResponseT> kv) throws UserCodeExecutionException {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
                try {
                    this.requestTCoder.encode(kv.getKey(), byteArrayOutputStream);
                    this.responseTCoder.encode(kv.getValue(), byteArrayOutputStream2);
                    this.client.setex(byteArrayOutputStream.toByteArray(), byteArrayOutputStream2.toByteArray(), this.expiry);
                    return kv;
                } catch (IOException e) {
                    throw new UserCodeExecutionException(e);
                }
            }

            @Override // org.apache.beam.io.requestresponse.SetupTeardown
            public void setup() throws UserCodeExecutionException {
                this.client.setup();
            }

            @Override // org.apache.beam.io.requestresponse.SetupTeardown
            public void teardown() throws UserCodeExecutionException {
                this.client.teardown();
            }
        }

        private UsingRedis(Coder<RequestT> coder, Coder<ResponseT> coder2, RedisClient redisClient) throws Coder.NonDeterministicException {
            this.client = redisClient;
            coder.verifyDeterministic();
            coder2.verifyDeterministic();
            this.requestTCoder = coder;
            this.responseTCoder = coder2;
        }

        Read<RequestT, ResponseT> read() {
            return new Read<>(this.requestTCoder, this.responseTCoder, this.client);
        }

        Write<RequestT, ResponseT> write(Duration duration) {
            return new Write<>(duration, this.requestTCoder, this.responseTCoder, this.client);
        }
    }

    public static <RequestT, ResponseT> Pair<RequestT, ResponseT> usingRedis(URI uri, Coder<RequestT> coder, Coder<ResponseT> coder2, Duration duration) throws Coder.NonDeterministicException {
        return Pair.of(readUsingRedis(new RedisClient(uri), coder, new CacheResponseCoder(coder2)), writeUsingRedis(duration, new RedisClient(uri), coder, new CacheResponseCoder(coder2)));
    }

    /* JADX WARN: Incorrect types in method signature: <RequestT:Ljava/lang/Object;ResponseT:Ljava/lang/Object;CallerSetupTeardownT::Lorg/apache/beam/io/requestresponse/Caller<TRequestT;Lorg/apache/beam/sdk/values/KV<TRequestT;TResponseT;>;>;:Lorg/apache/beam/io/requestresponse/SetupTeardown;>(TCallerSetupTeardownT;Lorg/apache/beam/sdk/coders/Coder<TRequestT;>;Lorg/apache/beam/sdk/coders/Coder<TResponseT;>;)Lorg/apache/beam/sdk/transforms/PTransform<Lorg/apache/beam/sdk/values/PCollection<TRequestT;>;Lorg/apache/beam/io/requestresponse/Result<Lorg/apache/beam/sdk/values/KV<TRequestT;TResponseT;>;>;>; */
    static PTransform read(Caller caller, Coder coder, Coder coder2) {
        return Call.ofCallerAndSetupTeardown(caller, KvCoder.of(coder, coder2));
    }

    static <RequestT, ResponseT> PTransform<PCollection<RequestT>, Result<KV<RequestT, ResponseT>>> readUsingRedis(RedisClient redisClient, Coder<RequestT> coder, Coder<ResponseT> coder2) throws Coder.NonDeterministicException {
        return read(new UsingRedis(coder, coder2, redisClient).read(), coder, coder2);
    }

    /* JADX WARN: Incorrect types in method signature: <RequestT:Ljava/lang/Object;ResponseT:Ljava/lang/Object;CallerSetupTeardownT::Lorg/apache/beam/io/requestresponse/Caller<Lorg/apache/beam/sdk/values/KV<TRequestT;TResponseT;>;Lorg/apache/beam/sdk/values/KV<TRequestT;TResponseT;>;>;:Lorg/apache/beam/io/requestresponse/SetupTeardown;>(TCallerSetupTeardownT;Lorg/apache/beam/sdk/coders/KvCoder<TRequestT;TResponseT;>;)Lorg/apache/beam/sdk/transforms/PTransform<Lorg/apache/beam/sdk/values/PCollection<Lorg/apache/beam/sdk/values/KV<TRequestT;TResponseT;>;>;Lorg/apache/beam/io/requestresponse/Result<Lorg/apache/beam/sdk/values/KV<TRequestT;TResponseT;>;>;>; */
    static PTransform write(Caller caller, KvCoder kvCoder) {
        return Call.ofCallerAndSetupTeardown(caller, kvCoder);
    }

    static <RequestT, ResponseT> PTransform<PCollection<KV<RequestT, ResponseT>>, Result<KV<RequestT, ResponseT>>> writeUsingRedis(Duration duration, RedisClient redisClient, Coder<RequestT> coder, Coder<ResponseT> coder2) throws Coder.NonDeterministicException {
        return write(new UsingRedis(coder, coder2, redisClient).write(duration), KvCoder.of(coder, coder2));
    }
}
