package org.apache.beam.io.requestresponse;

import java.net.URI;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.io.requestresponse.CallTest;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/io/requestresponse/CacheIT.class */
public class CacheIT {
    private static final String CONTAINER_IMAGE_NAME = "redis:5.0.3-alpine";
    private static final Integer PORT = 6379;
    private final EchoITOptions options = (EchoITOptions) IOITHelper.readIOTestPipelineOptions(EchoITOptions.class);

    @Rule
    public TestPipeline writePipeline = TestPipeline.fromOptions(this.options);

    @Rule
    public TestPipeline readPipeline = TestPipeline.fromOptions(this.options);

    @Rule
    public GenericContainer<?> redis = new GenericContainer(DockerImageName.parse(CONTAINER_IMAGE_NAME)).withExposedPorts(new Integer[]{PORT});

    @Rule
    public RedisExternalResourcesRule externalClients = new RedisExternalResourcesRule(() -> {
        this.redis.start();
        return URI.create(String.format("redis://%s:%d", this.redis.getHost(), this.redis.getFirstMappedPort()));
    });

    @BeforeClass
    public static void removeIntegrationTestsProperty() {
        System.clearProperty("integrationTestPipelineOptions");
    }

    @Test
    public void givenRequestResponsesCached_writeThenReadYieldsMatches() throws Coder.NonDeterministicException {
        ImmutableList of = ImmutableList.of(KV.of(new CallTest.Request("a"), new CallTest.Response("a")), KV.of(new CallTest.Request("b"), new CallTest.Response("b")), KV.of(new CallTest.Request("c"), new CallTest.Response("c")));
        writeThenReadThenPAssert(of, ImmutableList.of(new CallTest.Request("a"), new CallTest.Request("b"), new CallTest.Request("c")), of);
    }

    @Test
    public void givenNoMatchingRequestResponsePairs_yieldsKVsWithNullValues() throws Coder.NonDeterministicException {
        ImmutableList of = ImmutableList.of(KV.of(new CallTest.Request("a"), new CallTest.Response("a")), KV.of(new CallTest.Request("b"), new CallTest.Response("b")), KV.of(new CallTest.Request("c"), new CallTest.Response("c")));
        ImmutableList of2 = ImmutableList.of(new CallTest.Request("d"), new CallTest.Request("e"), new CallTest.Request("f"));
        writeThenReadThenPAssert(of, of2, (List) of2.stream().map(request -> {
            return KV.of(request, (Object) null);
        }).collect(Collectors.toList()));
    }

    private void writeThenReadThenPAssert(List<KV<CallTest.Request, CallTest.Response>> list, List<CallTest.Request> list2, List<KV<CallTest.Request, CallTest.Response>> list3) throws Coder.NonDeterministicException {
        this.writePipeline.apply(Create.of(list)).apply(Cache.writeUsingRedis(Duration.standardHours(1L), this.externalClients.getActualClient(), CallTest.DETERMINISTIC_REQUEST_CODER, CallTest.DETERMINISTIC_RESPONSE_CODER));
        Result apply = this.readPipeline.apply(Create.of(list2)).setCoder(CallTest.DETERMINISTIC_REQUEST_CODER).apply(Cache.readUsingRedis(this.externalClients.getActualClient(), CallTest.DETERMINISTIC_REQUEST_CODER, CallTest.DETERMINISTIC_RESPONSE_CODER));
        PAssert.that(apply.getFailures()).empty();
        PAssert.that(apply.getResponses()).containsInAnyOrder(list3);
        this.writePipeline.run().waitUntilFinish();
        this.readPipeline.run();
    }
}
