package org.apache.beam.sdk.io.redis;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Collections;
import org.apache.beam.sdk.io.redis.RedisIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.embedded.RedisServer;

/* loaded from: input_file:org/apache/beam/sdk/io/redis/RedisIOTest.class */
public class RedisIOTest {
    private static final String REDIS_HOST = "::1";

    @Rule
    public TestPipeline writePipeline = TestPipeline.create();

    @Rule
    public TestPipeline readPipeline = TestPipeline.create();
    private EmbeddedRedis embeddedRedis;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/redis/RedisIOTest$EmbeddedRedis.class */
    public static class EmbeddedRedis implements AutoCloseable {
        private final int port;
        private final RedisServer redisServer;

        public EmbeddedRedis() throws IOException {
            ServerSocket serverSocket = new ServerSocket(0);
            Throwable th = null;
            try {
                this.port = serverSocket.getLocalPort();
                if (0 != 0) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    serverSocket.close();
                }
                this.redisServer = new RedisServer(Integer.valueOf(this.port));
                this.redisServer.start();
            } catch (Throwable th3) {
                if (0 != 0) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    serverSocket.close();
                }
                throw th3;
            }
        }

        public int getPort() {
            return this.port;
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.redisServer.stop();
        }
    }

    @Before
    public void before() throws Exception {
        this.embeddedRedis = new EmbeddedRedis();
    }

    @After
    public void after() throws Exception {
        this.embeddedRedis.close();
    }

    private ArrayList<KV<String, String>> ingestData(String str, int i) {
        ArrayList<KV<String, String>> arrayList = new ArrayList<>();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(KV.of(str + "-key " + i2, "value " + i2));
        }
        this.writePipeline.apply(Create.of(arrayList)).apply(RedisIO.write().withEndpoint(REDIS_HOST, this.embeddedRedis.getPort()));
        this.writePipeline.run();
        return arrayList;
    }

    @Test
    public void testBulkRead() throws Exception {
        PAssert.that(this.readPipeline.apply("Read", RedisIO.read().withEndpoint(REDIS_HOST, this.embeddedRedis.getPort()).withKeyPattern("bulkread*").withBatchSize(10))).containsInAnyOrder(ingestData("bulkread", 100));
        this.readPipeline.run();
    }

    @Test
    public void testWriteReadUsingDefaultAppendMethod() throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 8000; i++) {
            arrayList.add(KV.of("key " + i, "value " + i));
        }
        this.writePipeline.apply(Create.of(arrayList)).apply(RedisIO.write().withEndpoint(REDIS_HOST, this.embeddedRedis.getPort()));
        this.writePipeline.run();
        PAssert.that(this.readPipeline.apply("Read", RedisIO.read().withEndpoint(REDIS_HOST, this.embeddedRedis.getPort()).withKeyPattern("key*"))).containsInAnyOrder(arrayList);
        PAssert.thatSingleton(this.readPipeline.apply("ReadNotMatch", RedisIO.read().withEndpoint(REDIS_HOST, this.embeddedRedis.getPort()).withKeyPattern("foobar*")).apply(Count.globally())).isEqualTo(0L);
        this.readPipeline.run();
    }

    @Test
    public void testConfiguration() {
        RedisIO.Write withEndpoint = RedisIO.write().withEndpoint("test", 111);
        Assert.assertEquals(111L, withEndpoint.connectionConfiguration().port());
        Assert.assertEquals("test", withEndpoint.connectionConfiguration().host());
    }

    @Test
    public void testWriteReadUsingSetMethod() throws Exception {
        RedisConnectionConfiguration.create(REDIS_HOST, this.embeddedRedis.getPort()).connect().set("key", "value");
        this.writePipeline.apply(Create.of(KV.of("key", "newValue"), new KV[0])).apply(RedisIO.write().withEndpoint(REDIS_HOST, this.embeddedRedis.getPort()).withMethod(RedisIO.Write.Method.SET));
        this.writePipeline.run();
        PAssert.that(this.readPipeline.apply("Read", RedisIO.read().withEndpoint(REDIS_HOST, this.embeddedRedis.getPort()).withKeyPattern("key"))).containsInAnyOrder(Collections.singletonList(KV.of("key", "newValue")));
        this.readPipeline.run();
    }

    @Test
    public void testWriteReadUsingLpushMethod() throws Exception {
        Jedis connect = RedisConnectionConfiguration.create(REDIS_HOST, this.embeddedRedis.getPort()).connect();
        connect.lpush("key", new String[]{"value"});
        this.writePipeline.apply(Create.of(KV.of("key", "newValue"), new KV[0])).apply(RedisIO.write().withEndpoint(REDIS_HOST, this.embeddedRedis.getPort()).withMethod(RedisIO.Write.Method.LPUSH));
        this.writePipeline.run();
        Assert.assertEquals("newValuevalue", String.join("", connect.lrange("key", 0L, -1L)));
    }

    @Test
    public void testWriteReadUsingRpushMethod() throws Exception {
        Jedis connect = RedisConnectionConfiguration.create(REDIS_HOST, this.embeddedRedis.getPort()).connect();
        connect.lpush("key", new String[]{"value"});
        this.writePipeline.apply(Create.of(KV.of("key", "newValue"), new KV[0])).apply(RedisIO.write().withEndpoint(REDIS_HOST, this.embeddedRedis.getPort()).withMethod(RedisIO.Write.Method.RPUSH));
        this.writePipeline.run();
        Assert.assertEquals("valuenewValue", String.join("", connect.lrange("key", 0L, -1L)));
    }

    @Test
    public void testWriteUsingHLLMethod() throws Exception {
        Jedis connect = RedisConnectionConfiguration.create(REDIS_HOST, this.embeddedRedis.getPort()).connect();
        this.writePipeline.apply(Create.of(KV.of("key", "0"), new KV[]{KV.of("key", "1"), KV.of("key", "2"), KV.of("key", "3"), KV.of("key", "2"), KV.of("key", "4"), KV.of("key", "0"), KV.of("key", "5")})).apply(RedisIO.write().withEndpoint(REDIS_HOST, this.embeddedRedis.getPort()).withMethod(RedisIO.Write.Method.PFADD));
        this.writePipeline.run();
        Assert.assertEquals(6L, connect.pfcount("key"));
    }

    @Test
    public void testReadBuildsCorrectly() {
        RedisIO.Read withTimeout = RedisIO.read().withEndpoint("test", 111).withAuth("pass").withTimeout(5);
        Assert.assertEquals("test", withTimeout.connectionConfiguration().host());
        Assert.assertEquals(111L, withTimeout.connectionConfiguration().port());
        Assert.assertEquals("pass", withTimeout.connectionConfiguration().auth());
        Assert.assertEquals(5L, withTimeout.connectionConfiguration().timeout());
    }

    @Test
    public void testWriteBuildsCorrectly() {
        RedisIO.Write withTimeout = RedisIO.write().withEndpoint("test", 111).withAuth("pass").withTimeout(5);
        Assert.assertEquals("test", withTimeout.connectionConfiguration().host());
        Assert.assertEquals(111L, withTimeout.connectionConfiguration().port());
        Assert.assertEquals("pass", withTimeout.connectionConfiguration().auth());
        Assert.assertEquals(5L, withTimeout.connectionConfiguration().timeout());
        Assert.assertEquals(RedisIO.Write.Method.APPEND, withTimeout.method());
    }
}
