package org.apache.beam.io.requestresponse;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.List;
import org.apache.beam.io.requestresponse.AutoValue_RequestResponseIOTest_Request;
import org.apache.beam.io.requestresponse.AutoValue_RequestResponseIOTest_Response;
import org.apache.beam.io.requestresponse.Call;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/io/requestresponse/RequestResponseIOTest.class */
public class RequestResponseIOTest {

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    private static final TypeDescriptor<Response> RESPONSE_TYPE = TypeDescriptor.of(Response.class);
    private static final SchemaProvider SCHEMA_PROVIDER = new AutoValueSchema();
    private static final Coder<Response> RESPONSE_CODER = SchemaCoder.of((Schema) Preconditions.checkStateNotNull(SCHEMA_PROVIDER.schemaFor(RESPONSE_TYPE)), RESPONSE_TYPE, (SerializableFunction) Preconditions.checkStateNotNull(SCHEMA_PROVIDER.toRowFunction(RESPONSE_TYPE)), (SerializableFunction) Preconditions.checkStateNotNull(SCHEMA_PROVIDER.fromRowFunction(RESPONSE_TYPE)));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/io/requestresponse/RequestResponseIOTest$CallerImpl.class */
    public static class CallerImpl implements Caller<Request, Response> {
        private int numErrors;

        private CallerImpl() {
            this.numErrors = 0;
        }

        private CallerImpl(int i) {
            this.numErrors = 0;
            this.numErrors = i;
        }

        public Response call(Request request) throws UserCodeExecutionException {
            if (this.numErrors <= 0) {
                return Response.builder().setAString(request.getAString()).setALong(request.getALong()).build();
            }
            this.numErrors--;
            throw new UserCodeQuotaException("");
        }
    }

    /* loaded from: input_file:org/apache/beam/io/requestresponse/RequestResponseIOTest$CallerSetupTeardownImpl.class */
    private static class CallerSetupTeardownImpl implements Caller<Request, Response>, SetupTeardown {
        private final CallerImpl caller;

        private CallerSetupTeardownImpl() {
            this.caller = new CallerImpl();
        }

        public Response call(Request request) throws UserCodeExecutionException {
            return this.caller.call(request);
        }

        public void setup() throws UserCodeExecutionException {
        }

        public void teardown() throws UserCodeExecutionException {
        }
    }

    /* loaded from: input_file:org/apache/beam/io/requestresponse/RequestResponseIOTest$CustomBackOffSupplier.class */
    private static class CustomBackOffSupplier implements SerializableSupplier<BackOff> {
        private final Counter counter;

        private CustomBackOffSupplier() {
            this.counter = Metrics.counter(CustomBackOffSupplier.class, "custom_counter");
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public BackOff m18get() {
            return new BackOff() { // from class: org.apache.beam.io.requestresponse.RequestResponseIOTest.CustomBackOffSupplier.1
                public void reset() throws IOException {
                }

                public long nextBackOffMillis() throws IOException {
                    CustomBackOffSupplier.this.counter.inc();
                    return 0L;
                }
            };
        }

        MetricName getCounterName() {
            return this.counter.getName();
        }
    }

    /* loaded from: input_file:org/apache/beam/io/requestresponse/RequestResponseIOTest$CustomCallShouldBackoff.class */
    private static class CustomCallShouldBackoff<ResponseT> implements CallShouldBackoff<ResponseT> {
        private final Counter counter;

        private CustomCallShouldBackoff() {
            this.counter = Metrics.counter(CustomCallShouldBackoff.class, "custom_counter");
        }

        public void update(UserCodeExecutionException userCodeExecutionException) {
        }

        public void update(ResponseT responset) {
        }

        public boolean isTrue() {
            this.counter.inc();
            return false;
        }

        MetricName getCounterName() {
            return this.counter.getName();
        }
    }

    /* loaded from: input_file:org/apache/beam/io/requestresponse/RequestResponseIOTest$CustomSleeperSupplier.class */
    private static class CustomSleeperSupplier implements SerializableSupplier<Sleeper> {
        private final Counter counter;

        private CustomSleeperSupplier() {
            this.counter = Metrics.counter(CustomSleeperSupplier.class, "custom_counter");
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public Sleeper m19get() {
            return j -> {
                this.counter.inc();
            };
        }

        MetricName getCounterName() {
            return this.counter.getName();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @DefaultSchema(AutoValueSchema.class)
    @AutoValue
    /* loaded from: input_file:org/apache/beam/io/requestresponse/RequestResponseIOTest$Request.class */
    public static abstract class Request {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/io/requestresponse/RequestResponseIOTest$Request$Builder.class */
        public static abstract class Builder {
            abstract Builder setAString(String str);

            abstract Builder setALong(Long l);

            abstract Request build();
        }

        static Builder builder() {
            return new AutoValue_RequestResponseIOTest_Request.Builder();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getAString();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Long getALong();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/io/requestresponse/RequestResponseIOTest$Response.class */
    public static abstract class Response {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/io/requestresponse/RequestResponseIOTest$Response$Builder.class */
        public static abstract class Builder {
            abstract Builder setAString(String str);

            abstract Builder setALong(Long l);

            abstract Response build();
        }

        static Builder builder() {
            return new AutoValue_RequestResponseIOTest_Response.Builder();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getAString();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Long getALong();
    }

    @Test
    public void givenCallerOnly_thenProcessesRequestsWithDefaultFeatures() {
        CallerImpl callerImpl = new CallerImpl();
        RequestResponseIO withMonitoringConfiguration = RequestResponseIO.of(callerImpl, RESPONSE_CODER).withMonitoringConfiguration(Monitoring.builder().build().withEverythingCountedExceptedCaching());
        Result apply = requests().apply("rrio", withMonitoringConfiguration);
        PAssert.that(apply.getFailures()).empty();
        PAssert.that(apply.getResponses()).containsInAnyOrder(responses());
        PipelineResult run = this.pipeline.run();
        MetricResults metrics = run.metrics();
        run.waitUntilFinish();
        MatcherAssert.assertThat(getCounterResult(metrics, (Class<?>) Call.class, "requests"), Matchers.greaterThan(0L));
        MatcherAssert.assertThat(getCounterResult(metrics, (Class<?>) Call.class, "responses"), Matchers.greaterThan(0L));
        MatcherAssert.assertThat(getCounterResult(metrics, (Class<?>) Call.class, Monitoring.callCounterNameOf(callerImpl)), Matchers.greaterThan(0L));
        MatcherAssert.assertThat(getCounterResult(metrics, (Class<?>) Call.class, Monitoring.setupCounterNameOf(new Call.NoopSetupTeardown())), Matchers.greaterThan(0L));
        MatcherAssert.assertThat(getCounterResult(metrics, (Class<?>) Call.class, "failures"), Matchers.equalTo(0L));
        MatcherAssert.assertThat(getCounterResult(metrics, (Class<?>) Call.class, Monitoring.shouldBackoffCounterName(withMonitoringConfiguration.getCallConfiguration().getCallShouldBackoff())), Matchers.equalTo(0L));
        MatcherAssert.assertThat(getCounterResult(metrics, (Class<?>) Call.class, Monitoring.sleeperCounterNameOf((Sleeper) withMonitoringConfiguration.getCallConfiguration().getSleeperSupplier().get())), Matchers.equalTo(0L));
        MatcherAssert.assertThat(getCounterResult(metrics, (Class<?>) Call.class, Monitoring.backoffCounterNameOf((BackOff) withMonitoringConfiguration.getCallConfiguration().getBackOffSupplier().get())), Matchers.equalTo(0L));
    }

    @Test
    public void givenCallerAndSetupTeardown_thenCallerInvokesSetupTeardown() {
        Result apply = requests().apply("rrio", RequestResponseIO.ofCallerAndSetupTeardown(new CallerSetupTeardownImpl(), RESPONSE_CODER).withMonitoringConfiguration(Monitoring.builder().setCountCalls(true).setCountSetup(true).build()));
        PAssert.that(apply.getFailures()).empty();
        PAssert.that(apply.getResponses()).containsInAnyOrder(responses());
        PipelineResult run = this.pipeline.run();
        MetricResults metrics = run.metrics();
        run.waitUntilFinish();
        MatcherAssert.assertThat(getCounterResult(metrics, (Class<?>) Call.class, Monitoring.callCounterNameOf(new CallerSetupTeardownImpl())), Matchers.greaterThan(0L));
        MatcherAssert.assertThat(getCounterResult(metrics, (Class<?>) Call.class, Monitoring.setupCounterNameOf(new CallerSetupTeardownImpl())), Matchers.greaterThan(0L));
    }

    @Test
    public void givenDefaultConfiguration_shouldRepeatFailedRequests() {
        Result apply = requests().apply("rrio", RequestResponseIO.of(new CallerImpl(1), RESPONSE_CODER).withMonitoringConfiguration(Monitoring.builder().setCountCalls(true).build()));
        PAssert.that(apply.getFailures()).empty();
        PAssert.that(apply.getResponses()).containsInAnyOrder(responses());
        PipelineResult run = this.pipeline.run();
        MetricResults metrics = run.metrics();
        run.waitUntilFinish();
        MatcherAssert.assertThat(getCounterResult(metrics, (Class<?>) Call.class, Monitoring.callCounterNameOf(new CallerImpl())), Matchers.equalTo(2L));
    }

    @Test
    public void givenDefaultConfiguration_usesDefaultBackoffSupplier() {
        requests().apply("rrio", RequestResponseIO.of(new CallerImpl(1), RESPONSE_CODER).withMonitoringConfiguration(Monitoring.builder().setCountBackoffs(true).build()));
        PipelineResult run = this.pipeline.run();
        MetricResults metrics = run.metrics();
        run.waitUntilFinish();
        MatcherAssert.assertThat(getCounterResult(metrics, (Class<?>) Call.class, Monitoring.backoffCounterNameOf(new DefaultSerializableBackoffSupplier().get())), Matchers.greaterThan(0L));
    }

    @Test
    public void givenDefaultConfiguration_usesDefaultSleeper() {
        requests().apply("rrio", RequestResponseIO.of(new CallerImpl(1), RESPONSE_CODER).withMonitoringConfiguration(Monitoring.builder().setCountSleeps(true).build()));
        PipelineResult run = this.pipeline.run();
        MetricResults metrics = run.metrics();
        run.waitUntilFinish();
        MatcherAssert.assertThat(getCounterResult(metrics, (Class<?>) Call.class, Monitoring.sleeperCounterNameOf(Sleeper.DEFAULT)), Matchers.greaterThan(0L));
    }

    @Test
    public void givenDefaultConfiguration_usesDefaultCallShouldBackoff() {
        RequestResponseIO of = RequestResponseIO.of(new CallerImpl(), RESPONSE_CODER);
        CallShouldBackoffBasedOnRejectionProbability callShouldBackoff = of.getCallConfiguration().getCallShouldBackoff();
        callShouldBackoff.setThreshold(0.0d);
        callShouldBackoff.update(new UserCodeExecutionException(""));
        requests().apply("rrio", of.withCallShouldBackoff(callShouldBackoff).withMonitoringConfiguration(Monitoring.builder().setCountShouldBackoff(true).build()));
        PipelineResult run = this.pipeline.run();
        MetricResults metrics = run.metrics();
        run.waitUntilFinish();
        MatcherAssert.assertThat(getCounterResult(metrics, (Class<?>) Call.class, Monitoring.shouldBackoffCounterName(callShouldBackoff)), Matchers.greaterThan(0L));
    }

    @Test
    public void givenWithoutRepeater_shouldNotRepeatRequests() {
        PAssert.that(requests().apply("rrio", RequestResponseIO.of(new CallerImpl(1), RESPONSE_CODER).withoutRepeater().withMonitoringConfiguration(Monitoring.builder().setCountCalls(true).setCountFailures(true).build())).getResponses()).empty();
        PipelineResult run = this.pipeline.run();
        MetricResults metrics = run.metrics();
        run.waitUntilFinish();
        MatcherAssert.assertThat(getCounterResult(metrics, (Class<?>) Call.class, Monitoring.callCounterNameOf(new CallerImpl())), Matchers.greaterThan(0L));
        MatcherAssert.assertThat(getCounterResult(metrics, (Class<?>) Call.class, "failures"), Matchers.greaterThan(0L));
    }

    @Test
    public void givenCustomCallShouldBackoff_thenComputeUsingCustom() {
        CustomCallShouldBackoff customCallShouldBackoff = new CustomCallShouldBackoff();
        requests().apply("rrio", RequestResponseIO.of(new CallerImpl(), RESPONSE_CODER).withCallShouldBackoff(customCallShouldBackoff));
        PipelineResult run = this.pipeline.run();
        MetricResults metrics = run.metrics();
        run.waitUntilFinish();
        MatcherAssert.assertThat(getCounterResult(metrics, customCallShouldBackoff.getClass(), customCallShouldBackoff.getCounterName().getName()), Matchers.greaterThan(0L));
    }

    @Test
    public void givenCustomSleeper_thenSleepBehaviorCustom() {
        CustomSleeperSupplier customSleeperSupplier = new CustomSleeperSupplier();
        requests().apply("rrio", RequestResponseIO.of(new CallerImpl(100), RESPONSE_CODER).withSleeperSupplier(customSleeperSupplier));
        PipelineResult run = this.pipeline.run();
        MetricResults metrics = run.metrics();
        run.waitUntilFinish();
        MatcherAssert.assertThat(getCounterResult(metrics, customSleeperSupplier.getClass(), customSleeperSupplier.getCounterName().getName()), Matchers.greaterThan(0L));
    }

    @Test
    public void givenCustomBackoff_thenBackoffBehaviorCustom() {
        CustomBackOffSupplier customBackOffSupplier = new CustomBackOffSupplier();
        requests().apply("rrio", RequestResponseIO.of(new CallerImpl(100), RESPONSE_CODER).withBackOffSupplier(customBackOffSupplier));
        PipelineResult run = this.pipeline.run();
        MetricResults metrics = run.metrics();
        run.waitUntilFinish();
        MatcherAssert.assertThat(getCounterResult(metrics, customBackOffSupplier.getClass(), customBackOffSupplier.getCounterName().getName()), Matchers.greaterThan(0L));
    }

    @Test
    @Ignore
    public void givenWithCache_thenRequestsResponsesCachedUsingCustom() {
    }

    private PCollection<Request> requests() {
        return this.pipeline.apply("create requests", Create.of(Request.builder().setALong(1L).setAString("a").build(), new Request[0]));
    }

    private List<Response> responses() {
        return ImmutableList.of(Response.builder().setAString("a").setALong(1L).build());
    }

    private static Long getCounterResult(MetricResults metricResults, Class<?> cls, String str) {
        return getCounterResult((Iterable<MetricResult<Long>>) metricResults.queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named(cls, str)).build()).getCounters(), cls, str);
    }

    private static Long getCounterResult(Iterable<MetricResult<Long>> iterable, Class<?> cls, String str) {
        Long l = 0L;
        for (MetricResult<Long> metricResult : iterable) {
            MetricName name = metricResult.getName();
            if (name.getNamespace().equals(cls.getName()) && name.getName().equals(str)) {
                l = (Long) metricResult.getCommitted();
            }
        }
        return l;
    }
}
