package org.apache.beam.sdk.io.redis;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import redis.embedded.RedisServer;

/* loaded from: input_file:org/apache/beam/sdk/io/redis/RedisIOTest.class */
public class RedisIOTest {

    @Rule
    public TestPipeline writePipeline = TestPipeline.create();

    @Rule
    public TestPipeline readPipeline = TestPipeline.create();
    private EmbeddedRedis embeddedRedis;

    /* loaded from: input_file:org/apache/beam/sdk/io/redis/RedisIOTest$EmbeddedRedis.class */
    private static class EmbeddedRedis implements AutoCloseable {
        private final int port;
        private final RedisServer redisServer;

        public EmbeddedRedis() throws IOException {
            ServerSocket serverSocket = new ServerSocket(0);
            Throwable th = null;
            try {
                this.port = serverSocket.getLocalPort();
                if (serverSocket != null) {
                    if (0 != 0) {
                        try {
                            serverSocket.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        serverSocket.close();
                    }
                }
                this.redisServer = new RedisServer(Integer.valueOf(this.port));
                this.redisServer.start();
            } catch (Throwable th3) {
                if (serverSocket != null) {
                    if (0 != 0) {
                        try {
                            serverSocket.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        serverSocket.close();
                    }
                }
                throw th3;
            }
        }

        public int getPort() {
            return this.port;
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.redisServer.stop();
        }
    }

    @Before
    public void before() throws Exception {
        this.embeddedRedis = new EmbeddedRedis();
    }

    @After
    public void after() throws Exception {
        this.embeddedRedis.close();
    }

    @Test
    public void testWriteRead() throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(KV.of("key " + i, "value " + i));
        }
        this.writePipeline.apply(Create.of(arrayList)).apply(RedisIO.write().withEndpoint("::1", this.embeddedRedis.getPort()));
        this.writePipeline.run();
        PAssert.that(this.readPipeline.apply("Read", RedisIO.read().withEndpoint("::1", this.embeddedRedis.getPort()).withKeyPattern("key*"))).containsInAnyOrder(arrayList);
        PAssert.thatSingleton(this.readPipeline.apply("ReadNotMatch", RedisIO.read().withEndpoint("::1", this.embeddedRedis.getPort()).withKeyPattern("foobar*")).apply(Count.globally())).isEqualTo(0L);
        this.readPipeline.run();
    }
}
