/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.redis;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration;
import org.apache.beam.sdk.io.redis.RedisCursor;
import org.apache.beam.sdk.io.redis.RedisIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.resps.StreamEntry;
import redis.embedded.RedisServer;

@RunWith(value=JUnit4.class)
public class RedisIOTest {
    private static final String REDIS_HOST = "localhost";
    private static final Long NO_EXPIRATION = -1L;
    @Rule
    public TestPipeline p = TestPipeline.create();
    private static RedisServer server;
    private static int port;
    private static Jedis client;

    @BeforeClass
    public static void beforeClass() throws Exception {
        port = NetworkTestHelper.getAvailableLocalPort();
        server = new RedisServer(port);
        server.start();
        client = RedisConnectionConfiguration.create((String)REDIS_HOST, (int)port).connect();
    }

    @AfterClass
    public static void afterClass() {
        client.close();
        server.stop();
    }

    @Test
    public void testRead() {
        List<KV<String, String>> data = this.buildIncrementalData("bulkread", 10);
        data.forEach(kv -> client.set((String)kv.getKey(), (String)kv.getValue()));
        PCollection read = (PCollection)this.p.apply("Read", (PTransform)RedisIO.read().withEndpoint(REDIS_HOST, port).withKeyPattern("bulkread*").withBatchSize(10));
        PAssert.that((PCollection)read).containsInAnyOrder(data);
        this.p.run();
    }

    @Test
    public void testReadSplitBig() {
        List<KV<String, String>> data = this.buildIncrementalData("bigset", 1000);
        data.forEach(kv -> client.set((String)kv.getKey(), (String)kv.getValue()));
        PCollection read = (PCollection)this.p.apply("Read", (PTransform)RedisIO.read().withEndpoint(REDIS_HOST, port).withKeyPattern("bigset*").withBatchSize(8));
        PAssert.that((PCollection)read).containsInAnyOrder(data);
        this.p.run();
    }

    @Test
    public void testReadSplitSmall() {
        List<KV<String, String>> data = this.buildIncrementalData("smallset", 5);
        data.forEach(kv -> client.set((String)kv.getKey(), (String)kv.getValue()));
        PCollection read = (PCollection)this.p.apply("Read", (PTransform)RedisIO.read().withEndpoint(REDIS_HOST, port).withKeyPattern("smallset*").withBatchSize(20));
        PAssert.that((PCollection)read).containsInAnyOrder(data);
        this.p.run();
    }

    @Test
    public void testReadWithKeyPattern() {
        List<KV<String, String>> data = this.buildIncrementalData("pattern", 10);
        data.forEach(kv -> client.set((String)kv.getKey(), (String)kv.getValue()));
        PCollection read = (PCollection)this.p.apply("Read", (PTransform)RedisIO.read().withEndpoint(REDIS_HOST, port).withKeyPattern("pattern*"));
        PAssert.that((PCollection)read).containsInAnyOrder(data);
        PCollection readNotMatch = (PCollection)this.p.apply("ReadNotMatch", (PTransform)RedisIO.read().withEndpoint(REDIS_HOST, port).withKeyPattern("foobar*"));
        PAssert.thatSingleton((PCollection)((PCollection)readNotMatch.apply(Count.globally()))).isEqualTo((Object)0L);
        this.p.run();
    }

    @Test
    public void testWriteWithMethodSet() {
        String key = "testWriteWithMethodSet";
        client.set(key, "value");
        String newValue = "newValue";
        PCollection write = (PCollection)this.p.apply((PTransform)Create.of((Object)KV.of((Object)key, (Object)newValue), (Object[])new KV[0]));
        write.apply((PTransform)RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.SET));
        this.p.run();
        Assert.assertEquals((Object)newValue, (Object)client.get(key));
        Assert.assertEquals((Object)NO_EXPIRATION, (Object)client.ttl(key));
    }

    @Test
    public void testWriteWithMethodSetWithExpiration() {
        String key = "testWriteWithMethodSet";
        client.set(key, "value");
        String newValue = "newValue";
        PCollection write = (PCollection)this.p.apply((PTransform)Create.of((Object)KV.of((Object)key, (Object)newValue), (Object[])new KV[0]));
        write.apply((PTransform)RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.SET).withExpireTime(Long.valueOf(10000L)));
        this.p.run();
        Assert.assertEquals((Object)newValue, (Object)client.get(key));
        Long expireTime = client.pttl(key);
        Assert.assertTrue((String)expireTime.toString(), (9000L <= expireTime && expireTime <= 100000L ? 1 : 0) != 0);
        client.del(key);
    }

    @Test
    public void testWriteWithMethodLPush() {
        String key = "testWriteWithMethodLPush";
        String value = "value";
        client.lpush(key, new String[]{value});
        String newValue = "newValue";
        PCollection write = (PCollection)this.p.apply((PTransform)Create.of((Object)KV.of((Object)key, (Object)newValue), (Object[])new KV[0]));
        write.apply((PTransform)RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.LPUSH));
        this.p.run();
        List values = client.lrange(key, 0L, -1L);
        Assert.assertEquals((Object)(newValue + value), (Object)String.join((CharSequence)"", values));
    }

    @Test
    public void testWriteWithMethodRPush() {
        String key = "testWriteWithMethodRPush";
        String value = "value";
        client.lpush(key, new String[]{value});
        String newValue = "newValue";
        PCollection write = (PCollection)this.p.apply((PTransform)Create.of((Object)KV.of((Object)key, (Object)newValue), (Object[])new KV[0]));
        write.apply((PTransform)RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.RPUSH));
        this.p.run();
        List values = client.lrange(key, 0L, -1L);
        Assert.assertEquals((Object)(value + newValue), (Object)String.join((CharSequence)"", values));
    }

    @Test
    public void testWriteWithMethodSAdd() {
        String key = "testWriteWithMethodSAdd";
        List<String> values = Arrays.asList("0", "1", "2", "3", "2", "4", "0", "5");
        List<KV<String, String>> data = RedisIOTest.buildConstantKeyList(key, values);
        PCollection write = (PCollection)this.p.apply((PTransform)Create.of(data));
        write.apply((PTransform)RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.SADD));
        this.p.run();
        HashSet<String> expected = new HashSet<String>(values);
        Set members = client.smembers(key);
        Assert.assertEquals(expected, (Object)members);
    }

    @Test
    public void testWriteWithMethodPFAdd() {
        String key = "testWriteWithMethodPFAdd";
        List<String> values = Arrays.asList("0", "1", "2", "3", "2", "4", "0", "5");
        List<KV<String, String>> data = RedisIOTest.buildConstantKeyList(key, values);
        PCollection write = (PCollection)this.p.apply((PTransform)Create.of(data));
        write.apply((PTransform)RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.PFADD));
        this.p.run();
        long count = client.pfcount(key);
        Assert.assertEquals((long)6L, (long)count);
        Assert.assertEquals((Object)NO_EXPIRATION, (Object)client.ttl(key));
    }

    @Test
    public void testWriteWithMethodPFAddWithExpireTime() {
        String key = "testWriteWithMethodPFAdd";
        List<String> values = Arrays.asList("0", "1", "2", "3", "2", "4", "0", "5");
        List<KV<String, String>> data = RedisIOTest.buildConstantKeyList(key, values);
        PCollection write = (PCollection)this.p.apply((PTransform)Create.of(data));
        write.apply((PTransform)RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.PFADD).withExpireTime(Long.valueOf(10000L)));
        this.p.run();
        long count = client.pfcount(key);
        Assert.assertEquals((long)6L, (long)count);
        Long expireTime = client.pttl(key);
        Assert.assertTrue((String)expireTime.toString(), (9000L <= expireTime && expireTime <= 100000L ? 1 : 0) != 0);
        client.del(key);
    }

    @Test
    public void testWriteUsingINCRBY() {
        String key = "key_incr";
        List<String> values = Arrays.asList("0", "1", "2", "-3", "2", "4", "0", "5");
        List<KV<String, String>> data = RedisIOTest.buildConstantKeyList(key, values);
        PCollection write = (PCollection)this.p.apply((PTransform)Create.of(data));
        write.apply((PTransform)RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.INCRBY));
        this.p.run();
        long count = Long.parseLong(client.get(key));
        Assert.assertEquals((long)11L, (long)count);
    }

    @Test
    public void testWriteUsingDECRBY() {
        String key = "key_decr";
        List<String> values = Arrays.asList("-10", "1", "2", "-3", "2", "4", "0", "5");
        List<KV<String, String>> data = RedisIOTest.buildConstantKeyList(key, values);
        PCollection write = (PCollection)this.p.apply((PTransform)Create.of(data));
        write.apply((PTransform)RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.DECRBY));
        this.p.run();
        long count = Long.parseLong(client.get(key));
        Assert.assertEquals((long)-1L, (long)count);
    }

    @Test
    public void testWriteStreams() {
        List redisKeys = IntStream.range(0, 10).boxed().map(idx -> UUID.randomUUID().toString()).collect(Collectors.toList());
        ImmutableMap fooValues = ImmutableMap.of((Object)"sensor-id", (Object)"1234", (Object)"temperature", (Object)"19.8");
        ImmutableMap barValues = ImmutableMap.of((Object)"sensor-id", (Object)"9999", (Object)"temperature", (Object)"18.2");
        List allData = redisKeys.stream().flatMap(arg_0 -> RedisIOTest.lambda$testWriteStreams$5((Map)fooValues, (Map)barValues, arg_0)).collect(Collectors.toList());
        PCollection write = (PCollection)this.p.apply((PTransform)Create.of(allData).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)MapCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of()))));
        write.apply((PTransform)RedisIO.writeStreams().withEndpoint(REDIS_HOST, port));
        this.p.run();
        for (String key : redisKeys) {
            List streamEntries = client.xrange(key, (StreamEntryID)null, (StreamEntryID)null, Integer.MAX_VALUE);
            Assert.assertEquals((long)2L, (long)streamEntries.size());
            MatcherAssert.assertThat((Object)Lists.transform((List)streamEntries, StreamEntry::getFields), (Matcher)CoreMatchers.hasItems((Object[])new Map[]{fooValues, barValues}));
        }
    }

    @Test
    public void testWriteStreamsWithTruncation() {
        List redisKeys = IntStream.range(0, 10).boxed().map(idx -> UUID.randomUUID().toString()).collect(Collectors.toList());
        ImmutableMap fooValues = ImmutableMap.of((Object)"sensor-id", (Object)"1234", (Object)"temperature", (Object)"19.8");
        ImmutableMap barValues = ImmutableMap.of((Object)"sensor-id", (Object)"9999", (Object)"temperature", (Object)"18.2");
        List allData = redisKeys.stream().flatMap(arg_0 -> RedisIOTest.lambda$testWriteStreamsWithTruncation$7((Map)fooValues, (Map)barValues, arg_0)).collect(Collectors.toList());
        PCollection write = (PCollection)this.p.apply((PTransform)Create.of(allData).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)MapCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of()))));
        write.apply((PTransform)RedisIO.writeStreams().withEndpoint(REDIS_HOST, port).withMaxLen(1L).withApproximateTrim(false));
        this.p.run();
        for (String stream : redisKeys) {
            long count = client.xlen(stream);
            Assert.assertEquals((long)1L, (long)count);
        }
    }

    @Test
    public void redisCursorToByteKey() {
        RedisCursor redisCursor = RedisCursor.of((String)"80", (long)200L, (boolean)true);
        ByteKey byteKey = RedisCursor.redisCursorToByteKey((RedisCursor)redisCursor);
        Assert.assertEquals((Object)ByteKey.of((int[])new int[]{0, 0, 0, 0, 0, 0, 0, 10}), (Object)byteKey);
    }

    @Test
    public void redisCursorToByteKeyZeroStart() {
        RedisCursor redisCursor = RedisCursor.of((String)"0", (long)200L, (boolean)true);
        ByteKey byteKey = RedisCursor.redisCursorToByteKey((RedisCursor)redisCursor);
        Assert.assertEquals((Object)RedisCursor.ZERO_KEY, (Object)byteKey);
    }

    @Test
    public void redisCursorToByteKeyZeroEnd() {
        RedisCursor redisCursor = RedisCursor.of((String)"0", (long)200L, (boolean)false);
        ByteKey byteKey = RedisCursor.redisCursorToByteKey((RedisCursor)redisCursor);
        Assert.assertEquals((Object)ByteKey.EMPTY, (Object)byteKey);
    }

    @Test
    public void redisCursorToByteKeyAndBack() {
        RedisCursor redisCursor = RedisCursor.of((String)"80", (long)200L, (boolean)true);
        ByteKey byteKey = RedisCursor.redisCursorToByteKey((RedisCursor)redisCursor);
        RedisCursor result = RedisCursor.byteKeyToRedisCursor((ByteKey)byteKey, (long)200L, (boolean)true);
        Assert.assertEquals((Object)redisCursor.getCursor(), (Object)result.getCursor());
    }

    @Test
    public void redisByteKeyToRedisCursor() {
        ByteKey bytes = ByteKey.of((int[])new int[]{0, 0, 0, 0, 0, 25, 68, 103});
        RedisCursor redisCursor = RedisCursor.byteKeyToRedisCursor((ByteKey)bytes, (long)0x10000AL, (boolean)true);
        Assert.assertEquals((Object)"1885267", (Object)redisCursor.getCursor());
    }

    private static List<KV<String, String>> buildConstantKeyList(String key, List<String> values) {
        ArrayList<KV<String, String>> data = new ArrayList<KV<String, String>>();
        for (String value : values) {
            data.add((KV<String, String>)KV.of((Object)key, (Object)value));
        }
        return data;
    }

    private List<KV<String, String>> buildIncrementalData(String keyPrefix, int size) {
        ArrayList<KV<String, String>> data = new ArrayList<KV<String, String>>();
        for (int i = 0; i < size; ++i) {
            data.add((KV<String, String>)KV.of((Object)(keyPrefix + i), (Object)String.valueOf(i)));
        }
        return data;
    }

    private static /* synthetic */ Stream lambda$testWriteStreamsWithTruncation$7(Map fooValues, Map barValues, String id) {
        return Stream.of(KV.of((Object)id, (Object)fooValues), KV.of((Object)id, (Object)barValues));
    }

    private static /* synthetic */ Stream lambda$testWriteStreams$5(Map fooValues, Map barValues, String id) {
        return Stream.of(KV.of((Object)id, (Object)fooValues), KV.of((Object)id, (Object)barValues));
    }
}

