package org.apache.beam.sdk.io.redis;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.redis.RedisIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
import redis.embedded.RedisServer;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/redis/RedisIOTest.class */
public class RedisIOTest {
    private static final String REDIS_HOST = "localhost";
    private static final Long NO_EXPIRATION = -1L;

    @Rule
    public TestPipeline p = TestPipeline.create();
    private static RedisServer server;
    private static int port;
    private static Jedis client;

    @BeforeClass
    public static void beforeClass() throws Exception {
        port = NetworkTestHelper.getAvailableLocalPort();
        server = new RedisServer(port);
        server.start();
        client = RedisConnectionConfiguration.create(REDIS_HOST, port).connect();
    }

    @AfterClass
    public static void afterClass() {
        client.close();
        server.stop();
    }

    @Test
    public void testRead() {
        List<KV<String, String>> buildIncrementalData = buildIncrementalData("bulkread", 10);
        buildIncrementalData.forEach(kv -> {
            client.set((String) kv.getKey(), (String) kv.getValue());
        });
        PAssert.that(this.p.apply("Read", RedisIO.read().withEndpoint(REDIS_HOST, port).withKeyPattern("bulkread*").withBatchSize(10))).containsInAnyOrder(buildIncrementalData);
        this.p.run();
    }

    @Test
    public void testReadSplitBig() {
        List<KV<String, String>> buildIncrementalData = buildIncrementalData("bigset", 1000);
        buildIncrementalData.forEach(kv -> {
            client.set((String) kv.getKey(), (String) kv.getValue());
        });
        PAssert.that(this.p.apply("Read", RedisIO.read().withEndpoint(REDIS_HOST, port).withKeyPattern("bigset*").withBatchSize(8))).containsInAnyOrder(buildIncrementalData);
        this.p.run();
    }

    @Test
    public void testReadSplitSmall() {
        List<KV<String, String>> buildIncrementalData = buildIncrementalData("smallset", 5);
        buildIncrementalData.forEach(kv -> {
            client.set((String) kv.getKey(), (String) kv.getValue());
        });
        PAssert.that(this.p.apply("Read", RedisIO.read().withEndpoint(REDIS_HOST, port).withKeyPattern("smallset*").withBatchSize(20))).containsInAnyOrder(buildIncrementalData);
        this.p.run();
    }

    @Test
    public void testReadWithKeyPattern() {
        List<KV<String, String>> buildIncrementalData = buildIncrementalData("pattern", 10);
        buildIncrementalData.forEach(kv -> {
            client.set((String) kv.getKey(), (String) kv.getValue());
        });
        PAssert.that(this.p.apply("Read", RedisIO.read().withEndpoint(REDIS_HOST, port).withKeyPattern("pattern*"))).containsInAnyOrder(buildIncrementalData);
        PAssert.thatSingleton(this.p.apply("ReadNotMatch", RedisIO.read().withEndpoint(REDIS_HOST, port).withKeyPattern("foobar*")).apply(Count.globally())).isEqualTo(0L);
        this.p.run();
    }

    @Test
    public void testWriteWithMethodSet() {
        client.set("testWriteWithMethodSet", "value");
        this.p.apply(Create.of(KV.of("testWriteWithMethodSet", "newValue"), new KV[0])).apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.SET));
        this.p.run();
        Assert.assertEquals("newValue", client.get("testWriteWithMethodSet"));
        Assert.assertEquals(NO_EXPIRATION, Long.valueOf(client.ttl("testWriteWithMethodSet")));
    }

    @Test
    public void testWriteWithMethodSetWithExpiration() {
        client.set("testWriteWithMethodSet", "value");
        this.p.apply(Create.of(KV.of("testWriteWithMethodSet", "newValue"), new KV[0])).apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.SET).withExpireTime(10000L));
        this.p.run();
        Assert.assertEquals("newValue", client.get("testWriteWithMethodSet"));
        Long valueOf = Long.valueOf(client.pttl("testWriteWithMethodSet"));
        Assert.assertTrue(valueOf.toString(), 9000 <= valueOf.longValue() && valueOf.longValue() <= 100000);
        client.del("testWriteWithMethodSet");
    }

    @Test
    public void testWriteWithMethodLPush() {
        client.lpush("testWriteWithMethodLPush", new String[]{"value"});
        this.p.apply(Create.of(KV.of("testWriteWithMethodLPush", "newValue"), new KV[0])).apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.LPUSH));
        this.p.run();
        Assert.assertEquals("newValuevalue", String.join("", client.lrange("testWriteWithMethodLPush", 0L, -1L)));
    }

    @Test
    public void testWriteWithMethodRPush() {
        client.lpush("testWriteWithMethodRPush", new String[]{"value"});
        this.p.apply(Create.of(KV.of("testWriteWithMethodRPush", "newValue"), new KV[0])).apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.RPUSH));
        this.p.run();
        Assert.assertEquals("valuenewValue", String.join("", client.lrange("testWriteWithMethodRPush", 0L, -1L)));
    }

    @Test
    public void testWriteWithMethodSAdd() {
        List asList = Arrays.asList("0", "1", "2", "3", "2", "4", "0", "5");
        this.p.apply(Create.of(buildConstantKeyList("testWriteWithMethodSAdd", asList))).apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.SADD));
        this.p.run();
        Assert.assertEquals(new HashSet(asList), client.smembers("testWriteWithMethodSAdd"));
    }

    @Test
    public void testWriteWithMethodPFAdd() {
        this.p.apply(Create.of(buildConstantKeyList("testWriteWithMethodPFAdd", Arrays.asList("0", "1", "2", "3", "2", "4", "0", "5")))).apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.PFADD));
        this.p.run();
        Assert.assertEquals(6L, client.pfcount("testWriteWithMethodPFAdd"));
        Assert.assertEquals(NO_EXPIRATION, Long.valueOf(client.ttl("testWriteWithMethodPFAdd")));
    }

    @Test
    public void testWriteWithMethodPFAddWithExpireTime() {
        this.p.apply(Create.of(buildConstantKeyList("testWriteWithMethodPFAdd", Arrays.asList("0", "1", "2", "3", "2", "4", "0", "5")))).apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.PFADD).withExpireTime(10000L));
        this.p.run();
        Assert.assertEquals(6L, client.pfcount("testWriteWithMethodPFAdd"));
        Long valueOf = Long.valueOf(client.pttl("testWriteWithMethodPFAdd"));
        Assert.assertTrue(valueOf.toString(), 9000 <= valueOf.longValue() && valueOf.longValue() <= 100000);
        client.del("testWriteWithMethodPFAdd");
    }

    @Test
    public void testWriteUsingINCRBY() {
        this.p.apply(Create.of(buildConstantKeyList("key_incr", Arrays.asList("0", "1", "2", "-3", "2", "4", "0", "5")))).apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.INCRBY));
        this.p.run();
        Assert.assertEquals(11L, Long.parseLong(client.get("key_incr")));
    }

    @Test
    public void testWriteUsingDECRBY() {
        this.p.apply(Create.of(buildConstantKeyList("key_decr", Arrays.asList("-10", "1", "2", "-3", "2", "4", "0", "5")))).apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.DECRBY));
        this.p.run();
        Assert.assertEquals(-1L, Long.parseLong(client.get("key_decr")));
    }

    @Test
    public void testWriteStreams() {
        List list = (List) IntStream.range(0, 10).boxed().map(num -> {
            return UUID.randomUUID().toString();
        }).collect(Collectors.toList());
        Map of = ImmutableMap.of("sensor-id", "1234", "temperature", "19.8");
        Map of2 = ImmutableMap.of("sensor-id", "9999", "temperature", "18.2");
        this.p.apply(Create.of((List) list.stream().flatMap(str -> {
            return Stream.of((Object[]) new KV[]{KV.of(str, of), KV.of(str, of2)});
        }).collect(Collectors.toList())).withCoder(KvCoder.of(StringUtf8Coder.of(), MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())))).apply(RedisIO.writeStreams().withEndpoint(REDIS_HOST, port));
        this.p.run();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            List xrange = client.xrange((String) it.next(), (StreamEntryID) null, (StreamEntryID) null, Integer.MAX_VALUE);
            Assert.assertEquals(2L, xrange.size());
            MatcherAssert.assertThat(Lists.transform(xrange, (v0) -> {
                return v0.getFields();
            }), CoreMatchers.hasItems(new Map[]{of, of2}));
        }
    }

    @Test
    public void testWriteStreamsWithTruncation() {
        List list = (List) IntStream.range(0, 10).boxed().map(num -> {
            return UUID.randomUUID().toString();
        }).collect(Collectors.toList());
        ImmutableMap of = ImmutableMap.of("sensor-id", "1234", "temperature", "19.8");
        ImmutableMap of2 = ImmutableMap.of("sensor-id", "9999", "temperature", "18.2");
        this.p.apply(Create.of((List) list.stream().flatMap(str -> {
            return Stream.of((Object[]) new KV[]{KV.of(str, of), KV.of(str, of2)});
        }).collect(Collectors.toList())).withCoder(KvCoder.of(StringUtf8Coder.of(), MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())))).apply(RedisIO.writeStreams().withEndpoint(REDIS_HOST, port).withMaxLen(1L).withApproximateTrim(false));
        this.p.run();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(1L, client.xlen((String) it.next()));
        }
    }

    @Test
    public void redisCursorToByteKey() {
        Assert.assertEquals(ByteKey.of(new int[]{0, 0, 0, 0, 0, 0, 0, 10}), RedisCursor.redisCursorToByteKey(RedisCursor.of("80", 200L, true)));
    }

    @Test
    public void redisCursorToByteKeyZeroStart() {
        Assert.assertEquals(RedisCursor.ZERO_KEY, RedisCursor.redisCursorToByteKey(RedisCursor.of("0", 200L, true)));
    }

    @Test
    public void redisCursorToByteKeyZeroEnd() {
        Assert.assertEquals(ByteKey.EMPTY, RedisCursor.redisCursorToByteKey(RedisCursor.of("0", 200L, false)));
    }

    @Test
    public void redisCursorToByteKeyAndBack() {
        RedisCursor of = RedisCursor.of("80", 200L, true);
        Assert.assertEquals(of.getCursor(), RedisCursor.byteKeyToRedisCursor(RedisCursor.redisCursorToByteKey(of), 200L, true).getCursor());
    }

    @Test
    public void redisByteKeyToRedisCursor() {
        Assert.assertEquals("1885267", RedisCursor.byteKeyToRedisCursor(ByteKey.of(new int[]{0, 0, 0, 0, 0, 25, 68, 103}), 1048586L, true).getCursor());
    }

    private static List<KV<String, String>> buildConstantKeyList(String str, List<String> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(KV.of(str, it.next()));
        }
        return arrayList;
    }

    private List<KV<String, String>> buildIncrementalData(String str, int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(KV.of(str + i2, String.valueOf(i2)));
        }
        return arrayList;
    }
}
