package org.apache.beam.io.requestresponse;

import com.google.protobuf.ByteString;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.util.UUID;
import org.apache.beam.io.requestresponse.Call;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.testinfra.mockapis.echo.v1.Echo;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.class */
public class ThrottleWithExternalResourceIT {
    private static final String QUOTA_ID = "echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota";
    private static EchoITOptions options;
    private static EchoGRPCCallerWithSetupTeardown client;
    private static final String CONTAINER_IMAGE_NAME = "redis:5.0.3-alpine";
    private static final Quota QUOTA = new Quota(1, Duration.standardSeconds(1));
    private static final ByteString PAYLOAD = ByteString.copyFromUtf8("payload");
    private static final Integer PORT = 6379;
    private static final EchoRequestCoder REQUEST_CODER = new EchoRequestCoder();
    private static final Coder<Echo.EchoResponse> RESPONSE_CODER = SerializableCoder.of(TypeDescriptor.of(Echo.EchoResponse.class));

    @Rule
    public TestPipeline pipeline = TestPipeline.create();

    @Rule
    public GenericContainer<?> redis = new GenericContainer(DockerImageName.parse(CONTAINER_IMAGE_NAME)).withExposedPorts(new Integer[]{PORT});

    @BeforeClass
    public static void setUp() throws UserCodeExecutionException {
        options = (EchoITOptions) IOITHelper.readIOTestPipelineOptions(EchoITOptions.class);
        if (Strings.isNullOrEmpty(options.getGrpcEndpointAddress())) {
            throw new RuntimeException("--grpcEndpointAddress is missing. See " + EchoITOptions.class + "for details.");
        }
        client = EchoGRPCCallerWithSetupTeardown.of(URI.create(options.getGrpcEndpointAddress()));
        ((EchoGRPCCallerWithSetupTeardown) Preconditions.checkStateNotNull(client)).setup();
        try {
            client.call(createRequest());
        } catch (UserCodeExecutionException e) {
            if (!(e instanceof UserCodeQuotaException)) {
                throw e;
            }
            throw new RuntimeException(String.format("The quota: %s is set to refresh on an interval. Unless there are failures in this test, wait for a few seconds before running the test again.", QUOTA_ID), e);
        }
    }

    @AfterClass
    public static void tearDown() throws UserCodeExecutionException {
        ((EchoGRPCCallerWithSetupTeardown) Preconditions.checkStateNotNull(client)).teardown();
    }

    @Test
    public void givenThrottleUsingRedis_preventsQuotaErrors() throws Coder.NonDeterministicException {
        URI create = URI.create(String.format("redis://%s:%d", this.redis.getHost(), this.redis.getFirstMappedPort()));
        this.pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
        Call.Result apply = createRequestStream().apply("throttle", ThrottleWithExternalResource.usingRedis(create, QUOTA_ID, UUID.randomUUID().toString(), QUOTA, REQUEST_CODER));
        PAssert.that(apply.getFailures()).empty();
        PAssert.thatSingleton(apply.getResponses().apply("window throttled", Window.into(FixedWindows.of(Duration.standardSeconds(1L)))).apply("count throttled", Combine.globally(Count.combineFn()).withoutDefaults())).notEqualTo(0L);
        PAssert.that(apply.getResponses().apply("window throttled before extraction", Window.into(FixedWindows.of(Duration.standardSeconds(1L)))).apply("extract request id", MapElements.into(TypeDescriptors.strings()).via(echoRequest -> {
            return ((Echo.EchoRequest) Preconditions.checkStateNotNull(echoRequest)).getId();
        })).apply("distinct", Distinct.create())).containsInAnyOrder(new String[]{QUOTA_ID});
        Call.Result apply2 = apply.getResponses().apply("call", Call.ofCallerAndSetupTeardown(client, RESPONSE_CODER));
        PAssert.that(apply2.getFailures()).empty();
        PAssert.that(apply2.getResponses().apply("window responses before extraction", Window.into(FixedWindows.of(Duration.standardSeconds(1L)))).apply("extract response id", MapElements.into(TypeDescriptors.strings()).via(echoResponse -> {
            return ((Echo.EchoResponse) Preconditions.checkStateNotNull(echoResponse)).getId();
        }))).containsInAnyOrder(new String[]{QUOTA_ID});
        this.pipeline.run().waitUntilFinish(Duration.standardSeconds(3L));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Echo.EchoRequest createRequest() {
        return Echo.EchoRequest.newBuilder().setId(QUOTA_ID).setPayload(PAYLOAD).build();
    }

    private PCollection<Echo.EchoRequest> createRequestStream() {
        return this.pipeline.apply("impulse", PeriodicImpulse.create().withInterval(Duration.millis(10L))).apply("requests", MapElements.into(TypeDescriptor.of(Echo.EchoRequest.class)).via(instant -> {
            return createRequest();
        }));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1411693370:
                if (implMethodName.equals("lambda$givenThrottleUsingRedis_preventsQuotaErrors$43268ee4$1")) {
                    z = 2;
                    break;
                }
                break;
            case -1411693369:
                if (implMethodName.equals("lambda$givenThrottleUsingRedis_preventsQuotaErrors$43268ee4$2")) {
                    z = true;
                    break;
                }
                break;
            case 295445500:
                if (implMethodName.equals("lambda$createRequestStream$700b9b6c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/joda/time/Instant;)Lorg/apache/beam/testinfra/mockapis/echo/v1/Echo$EchoRequest;")) {
                    return instant -> {
                        return createRequest();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/testinfra/mockapis/echo/v1/Echo$EchoResponse;)Ljava/lang/String;")) {
                    return echoResponse -> {
                        return ((Echo.EchoResponse) Preconditions.checkStateNotNull(echoResponse)).getId();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/testinfra/mockapis/echo/v1/Echo$EchoRequest;)Ljava/lang/String;")) {
                    return echoRequest -> {
                        return ((Echo.EchoRequest) Preconditions.checkStateNotNull(echoRequest)).getId();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
