package org.apache.beam.sdk.io.redis;

import com.google.auto.value.AutoValue;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_sdks_java_io_redis.com.google.common.base.Preconditions;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.redis.AutoValue_RedisIO_Read;
import org.apache.beam.sdk.io.redis.AutoValue_RedisIO_ReadAll;
import org.apache.beam.sdk.io.redis.AutoValue_RedisIO_Write;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/apache/beam/sdk/io/redis/RedisIO.class */
public class RedisIO {

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/redis/RedisIO$Read.class */
    public static abstract class Read extends PTransform<PBegin, PCollection<KV<String, String>>> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/redis/RedisIO$Read$Builder.class */
        public static abstract class Builder {
            @Nullable
            abstract Builder setConnectionConfiguration(RedisConnectionConfiguration redisConnectionConfiguration);

            @Nullable
            abstract Builder setKeyPattern(String str);

            abstract Read build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract RedisConnectionConfiguration connectionConfiguration();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String keyPattern();

        abstract Builder builder();

        public Read withEndpoint(String str, int i) {
            Preconditions.checkArgument(str != null, "host can not be null");
            Preconditions.checkArgument(i > 0, "port can not be negative or 0");
            return builder().setConnectionConfiguration(connectionConfiguration().withHost(str).withPort(i)).build();
        }

        public Read withAuth(String str) {
            Preconditions.checkArgument(str != null, "auth can not be null");
            return builder().setConnectionConfiguration(connectionConfiguration().withAuth(str)).build();
        }

        public Read withTimeout(int i) {
            Preconditions.checkArgument(i >= 0, "timeout can not be negative");
            return builder().setConnectionConfiguration(connectionConfiguration().withTimeout(i)).build();
        }

        public Read withKeyPattern(String str) {
            Preconditions.checkArgument(str != null, "keyPattern can not be null");
            return builder().setKeyPattern(str).build();
        }

        public Read withConnectionConfiguration(RedisConnectionConfiguration redisConnectionConfiguration) {
            Preconditions.checkArgument(redisConnectionConfiguration != null, "connection can not be null");
            return builder().setConnectionConfiguration(redisConnectionConfiguration).build();
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            connectionConfiguration().populateDisplayData(builder);
        }

        public PCollection<KV<String, String>> expand(PBegin pBegin) {
            Preconditions.checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required");
            return pBegin.apply(Create.of(keyPattern(), new String[0])).apply(RedisIO.readAll().withConnectionConfiguration(connectionConfiguration()));
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/redis/RedisIO$ReadAll.class */
    public static abstract class ReadAll extends PTransform<PCollection<String>, PCollection<KV<String, String>>> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/redis/RedisIO$ReadAll$Builder.class */
        public static abstract class Builder {
            @Nullable
            abstract Builder setConnectionConfiguration(RedisConnectionConfiguration redisConnectionConfiguration);

            abstract ReadAll build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract RedisConnectionConfiguration connectionConfiguration();

        abstract Builder builder();

        public ReadAll withEndpoint(String str, int i) {
            Preconditions.checkArgument(str != null, "host can not be null");
            Preconditions.checkArgument(i > 0, "port can not be negative or 0");
            return builder().setConnectionConfiguration(connectionConfiguration().withHost(str).withPort(i)).build();
        }

        public ReadAll withAuth(String str) {
            Preconditions.checkArgument(str != null, "auth can not be null");
            return builder().setConnectionConfiguration(connectionConfiguration().withAuth(str)).build();
        }

        public ReadAll withTimeout(int i) {
            Preconditions.checkArgument(i >= 0, "timeout can not be negative");
            return builder().setConnectionConfiguration(connectionConfiguration().withTimeout(i)).build();
        }

        public ReadAll withConnectionConfiguration(RedisConnectionConfiguration redisConnectionConfiguration) {
            Preconditions.checkArgument(redisConnectionConfiguration != null, "connection can not be null");
            return builder().setConnectionConfiguration(redisConnectionConfiguration).build();
        }

        public PCollection<KV<String, String>> expand(PCollection<String> pCollection) {
            Preconditions.checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required");
            return pCollection.apply(ParDo.of(new ReadFn(connectionConfiguration()))).setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())).apply(new Reparallelize());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/redis/RedisIO$ReadFn.class */
    public static class ReadFn extends DoFn<String, KV<String, String>> {
        private final RedisConnectionConfiguration connectionConfiguration;
        private transient Jedis jedis;

        public ReadFn(RedisConnectionConfiguration redisConnectionConfiguration) {
            this.connectionConfiguration = redisConnectionConfiguration;
        }

        @DoFn.Setup
        public void setup() {
            this.jedis = this.connectionConfiguration.connect();
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<String, KV<String, String>>.ProcessContext processContext) throws Exception {
            ScanParams scanParams = new ScanParams();
            scanParams.match((String) processContext.element());
            String str = ScanParams.SCAN_POINTER_START;
            boolean z = false;
            while (!z) {
                ScanResult scan = this.jedis.scan(str, scanParams);
                List result = scan.getResult();
                Pipeline pipelined = this.jedis.pipelined();
                if (result != null) {
                    Iterator it = result.iterator();
                    while (it.hasNext()) {
                        pipelined.get((String) it.next());
                    }
                    List syncAndReturnAll = pipelined.syncAndReturnAll();
                    for (int i = 0; i < syncAndReturnAll.size(); i++) {
                        processContext.output(KV.of((String) result.get(i), (String) syncAndReturnAll.get(i)));
                    }
                }
                str = scan.getStringCursor();
                if ("0".equals(str)) {
                    z = true;
                }
            }
        }

        @DoFn.Teardown
        public void teardown() {
            this.jedis.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/redis/RedisIO$Reparallelize.class */
    public static class Reparallelize extends PTransform<PCollection<KV<String, String>>, PCollection<KV<String, String>>> {
        private Reparallelize() {
        }

        public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> pCollection) {
            return pCollection.apply("Identity", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() { // from class: org.apache.beam.sdk.io.redis.RedisIO.Reparallelize.1
                @DoFn.ProcessElement
                public void processElement(DoFn<KV<String, String>, KV<String, String>>.ProcessContext processContext) {
                    processContext.output((KV) processContext.element());
                }
            }).withSideInputs(new PCollectionView[]{pCollection.apply("Consume", Filter.by(SerializableFunctions.constant(false))).apply(View.asIterable())})).apply(Reshuffle.viaRandomKey());
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/redis/RedisIO$Write.class */
    public static abstract class Write extends PTransform<PCollection<KV<String, String>>, PDone> {

        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/redis/RedisIO$Write$Builder.class */
        static abstract class Builder {
            abstract Builder setConnectionConfiguration(RedisConnectionConfiguration redisConnectionConfiguration);

            abstract Builder setMethod(Method method);

            abstract Builder setExpireTime(Long l);

            abstract Write build();
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/redis/RedisIO$Write$Method.class */
        public enum Method {
            APPEND,
            SET,
            LPUSH,
            RPUSH
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/redis/RedisIO$Write$WriteFn.class */
        public static class WriteFn extends DoFn<KV<String, String>, Void> {
            private static final int DEFAULT_BATCH_SIZE = 1000;
            private final Write spec;
            private transient Jedis jedis;
            private transient Pipeline pipeline;
            private int batchCount;

            public WriteFn(Write write) {
                this.spec = write;
            }

            @DoFn.Setup
            public void setup() {
                this.jedis = this.spec.connectionConfiguration().connect();
            }

            @DoFn.StartBundle
            public void startBundle() {
                this.pipeline = this.jedis.pipelined();
                this.pipeline.multi();
                this.batchCount = 0;
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, String>, Void>.ProcessContext processContext) {
                writeRecord((KV) processContext.element());
                this.batchCount++;
                if (this.batchCount >= DEFAULT_BATCH_SIZE) {
                    this.pipeline.exec();
                    this.batchCount = 0;
                }
            }

            private void writeRecord(KV<String, String> kv) {
                Method method = this.spec.method();
                Long expireTime = this.spec.expireTime();
                if (Method.APPEND == method) {
                    writeUsingAppendCommand(kv, expireTime);
                    return;
                }
                if (Method.SET == method) {
                    writeUsingSetCommand(kv, expireTime);
                } else if (Method.LPUSH == method || Method.RPUSH == method) {
                    writeUsingListCommand(kv, method, expireTime);
                }
            }

            private void writeUsingAppendCommand(KV<String, String> kv, Long l) {
                String str = (String) kv.getKey();
                this.pipeline.append(str, (String) kv.getValue());
                setExpireTimeWhenRequired(str, l);
            }

            private void writeUsingSetCommand(KV<String, String> kv, Long l) {
                String str = (String) kv.getKey();
                String str2 = (String) kv.getValue();
                if (l != null) {
                    this.pipeline.psetex(str, l.longValue(), str2);
                } else {
                    this.pipeline.set(str, str2);
                }
            }

            private void writeUsingListCommand(KV<String, String> kv, Method method, Long l) {
                String str = (String) kv.getKey();
                String str2 = (String) kv.getValue();
                if (Method.LPUSH == method) {
                    this.pipeline.lpush(str, new String[]{str2});
                } else if (Method.RPUSH == method) {
                    this.pipeline.rpush(str, new String[]{str2});
                }
                setExpireTimeWhenRequired(str, l);
            }

            private void setExpireTimeWhenRequired(String str, Long l) {
                if (l != null) {
                    this.pipeline.pexpire(str, l.longValue());
                }
            }

            @DoFn.FinishBundle
            public void finishBundle() {
                this.pipeline.exec();
                this.batchCount = 0;
            }

            @DoFn.Teardown
            public void teardown() {
                this.jedis.close();
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract RedisConnectionConfiguration connectionConfiguration();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Method method();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Long expireTime();

        abstract Builder builder();

        public Write withEndpoint(String str, int i) {
            Preconditions.checkArgument(str != null, "host can not be null");
            Preconditions.checkArgument(i > 0, "port can not be negative or 0");
            return builder().setConnectionConfiguration(connectionConfiguration().withHost(str).withPort(i)).build();
        }

        public Write withAuth(String str) {
            Preconditions.checkArgument(str != null, "auth can not be null");
            return builder().setConnectionConfiguration(connectionConfiguration().withAuth(str)).build();
        }

        public Write withTimeout(int i) {
            Preconditions.checkArgument(i >= 0, "timeout can not be negative");
            return builder().setConnectionConfiguration(connectionConfiguration().withTimeout(i)).build();
        }

        public Write withConnectionConfiguration(RedisConnectionConfiguration redisConnectionConfiguration) {
            Preconditions.checkArgument(redisConnectionConfiguration != null, "connection can not be null");
            return builder().setConnectionConfiguration(redisConnectionConfiguration).build();
        }

        public Write withMethod(Method method) {
            Preconditions.checkArgument(method != null, "method can not be null");
            return builder().setMethod(method).build();
        }

        public Write withExpireTime(Long l) {
            Preconditions.checkArgument(l != null, "expireTimeMillis can not be null");
            Preconditions.checkArgument(l.longValue() > 0, "expireTimeMillis can not be negative or 0");
            return builder().setExpireTime(l).build();
        }

        public PDone expand(PCollection<KV<String, String>> pCollection) {
            Preconditions.checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required");
            pCollection.apply(ParDo.of(new WriteFn(this)));
            return PDone.in(pCollection.getPipeline());
        }
    }

    public static Read read() {
        return new AutoValue_RedisIO_Read.Builder().setConnectionConfiguration(RedisConnectionConfiguration.create()).setKeyPattern("*").build();
    }

    public static ReadAll readAll() {
        return new AutoValue_RedisIO_ReadAll.Builder().setConnectionConfiguration(RedisConnectionConfiguration.create()).build();
    }

    public static Write write() {
        return new AutoValue_RedisIO_Write.Builder().setConnectionConfiguration(RedisConnectionConfiguration.create()).setMethod(Write.Method.APPEND).build();
    }

    private RedisIO() {
    }
}
