package org.apache.beam.sdk.io.redis;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.io.redis.RedisIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import redis.clients.jedis.Jedis;
import redis.embedded.RedisServer;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/redis/RedisIOTest.class */
public class RedisIOTest {
    private static final String REDIS_HOST = "localhost";

    @Rule
    public TestPipeline p = TestPipeline.create();
    private static RedisServer server;
    private static int port;
    private static Jedis client;

    @BeforeClass
    public static void beforeClass() throws Exception {
        port = NetworkTestHelper.getAvailableLocalPort();
        server = new RedisServer(Integer.valueOf(port));
        server.start();
        client = RedisConnectionConfiguration.create(REDIS_HOST, port).connect();
    }

    @AfterClass
    public static void afterClass() {
        client.close();
        server.stop();
    }

    @Test
    public void testRead() {
        List<KV<String, String>> buildIncrementalData = buildIncrementalData("bulkread", 10);
        buildIncrementalData.forEach(kv -> {
            client.set((String) kv.getKey(), (String) kv.getValue());
        });
        PAssert.that(this.p.apply("Read", RedisIO.read().withEndpoint(REDIS_HOST, port).withKeyPattern("bulkread*").withBatchSize(10))).containsInAnyOrder(buildIncrementalData);
        this.p.run();
    }

    @Test
    public void testReadWithKeyPattern() {
        List<KV<String, String>> buildIncrementalData = buildIncrementalData("pattern", 10);
        buildIncrementalData.forEach(kv -> {
            client.set((String) kv.getKey(), (String) kv.getValue());
        });
        PAssert.that(this.p.apply("Read", RedisIO.read().withEndpoint(REDIS_HOST, port).withKeyPattern("pattern*"))).containsInAnyOrder(buildIncrementalData);
        PAssert.thatSingleton(this.p.apply("ReadNotMatch", RedisIO.read().withEndpoint(REDIS_HOST, port).withKeyPattern("foobar*")).apply(Count.globally())).isEqualTo(0L);
        this.p.run();
    }

    @Test
    public void testWriteWithMethodSet() {
        client.set("testWriteWithMethodSet", "value");
        this.p.apply(Create.of(KV.of("testWriteWithMethodSet", "newValue"), new KV[0])).apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.SET));
        this.p.run();
        Assert.assertEquals("newValue", client.get("testWriteWithMethodSet"));
    }

    @Test
    public void testWriteWithMethodLPush() {
        client.lpush("testWriteWithMethodLPush", new String[]{"value"});
        this.p.apply(Create.of(KV.of("testWriteWithMethodLPush", "newValue"), new KV[0])).apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.LPUSH));
        this.p.run();
        Assert.assertEquals("newValuevalue", String.join("", client.lrange("testWriteWithMethodLPush", 0L, -1L)));
    }

    @Test
    public void testWriteWithMethodRPush() {
        client.lpush("testWriteWithMethodRPush", new String[]{"value"});
        this.p.apply(Create.of(KV.of("testWriteWithMethodRPush", "newValue"), new KV[0])).apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.RPUSH));
        this.p.run();
        Assert.assertEquals("valuenewValue", String.join("", client.lrange("testWriteWithMethodRPush", 0L, -1L)));
    }

    @Test
    public void testWriteWithMethodSAdd() {
        List asList = Arrays.asList("0", "1", "2", "3", "2", "4", "0", "5");
        this.p.apply(Create.of(buildConstantKeyList("testWriteWithMethodSAdd", asList))).apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.SADD));
        this.p.run();
        Assert.assertEquals(new HashSet(asList), client.smembers("testWriteWithMethodSAdd"));
    }

    @Test
    public void testWriteWithMethodPFAdd() {
        this.p.apply(Create.of(buildConstantKeyList("testWriteWithMethodPFAdd", Arrays.asList("0", "1", "2", "3", "2", "4", "0", "5")))).apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.PFADD));
        this.p.run();
        Assert.assertEquals(6L, client.pfcount("testWriteWithMethodPFAdd"));
    }

    @Test
    public void testWriteUsingINCRBY() throws Exception {
        this.p.apply(Create.of(buildConstantKeyList("key_incr", Arrays.asList("0", "1", "2", "-3", "2", "4", "0", "5")))).apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.INCRBY));
        this.p.run();
        Assert.assertEquals(11L, Long.parseLong(client.get("key_incr")));
    }

    @Test
    public void testWriteUsingDECRBY() throws Exception {
        this.p.apply(Create.of(buildConstantKeyList("key_decr", Arrays.asList("-10", "1", "2", "-3", "2", "4", "0", "5")))).apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(RedisIO.Write.Method.DECRBY));
        this.p.run();
        Assert.assertEquals(-1L, Long.parseLong(client.get("key_decr")));
    }

    private static List<KV<String, String>> buildConstantKeyList(String str, List<String> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(KV.of(str, it.next()));
        }
        return arrayList;
    }

    private List<KV<String, String>> buildIncrementalData(String str, int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(KV.of(str + i2, String.valueOf(i2)));
        }
        return arrayList;
    }
}
