package org.apache.beam.io.requestresponse;

import java.lang.reflect.InvocationTargetException;
import java.util.Objects;
import org.apache.beam.io.requestresponse.Repeater;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/io/requestresponse/RepeaterTest.class */
public class RepeaterTest {

    @Rule
    public TestPipeline pipeline = TestPipeline.create();
    private static final TupleTag<Integer> OUTPUT_TAG = new TupleTag<Integer>() { // from class: org.apache.beam.io.requestresponse.RepeaterTest.1
    };
    private static final TupleTag<String> FAILURE_TAG = new TupleTag<String>() { // from class: org.apache.beam.io.requestresponse.RepeaterTest.2
    };
    private static final int LIMIT = 3;
    private static final FluentBackoff FLUENT_BACKOFF = FluentBackoff.DEFAULT.withMaxRetries(LIMIT);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/io/requestresponse/RepeaterTest$CallerImpl.class */
    public static class CallerImpl implements Caller<Integer, Integer> {
        private int wantNumErrors;
        private final Class<? extends UserCodeExecutionException> wantThrowWith;
        private final String exceptionName;

        private CallerImpl(int i) {
            this(i, (Class<? extends UserCodeExecutionException>) UserCodeExecutionException.class);
        }

        private CallerImpl(int i, Class<? extends UserCodeExecutionException> cls) {
            this.wantNumErrors = i;
            this.wantThrowWith = cls;
            this.exceptionName = RepeaterTest.getRepeatableErrorTypeName(cls);
        }

        public Integer call(Integer num) throws UserCodeExecutionException {
            if (this.wantNumErrors <= 0) {
                return Integer.valueOf(num.intValue() * 2);
            }
            this.wantNumErrors--;
            try {
                throw this.wantThrowWith.getConstructor(String.class).newInstance(this.exceptionName);
            } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/io/requestresponse/RepeaterTest$DoFnWithRepeaters.class */
    private static class DoFnWithRepeaters extends DoFn<Integer, Integer> {
        private final CallerImpl caller;
        private final SetupTeardownImpl setupTeardown;

        private DoFnWithRepeaters(CallerImpl callerImpl, SetupTeardownImpl setupTeardownImpl) {
            this.caller = callerImpl;
            this.setupTeardown = setupTeardownImpl;
        }

        @DoFn.Setup
        public void setup() throws UserCodeExecutionException {
            Repeater.builder().setSleeper(new NoOpSleeper()).setBackOff(RepeaterTest.FLUENT_BACKOFF.backoff()).setThrowableFunction(r3 -> {
                this.setupTeardown.setup();
                return null;
            }).build().apply((Object) null);
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element Integer num, DoFn.MultiOutputReceiver multiOutputReceiver) {
            Repeater.Builder backOff = Repeater.builder().setSleeper(new NoOpSleeper()).setBackOff(RepeaterTest.FLUENT_BACKOFF.backoff());
            CallerImpl callerImpl = this.caller;
            Objects.requireNonNull(callerImpl);
            try {
                multiOutputReceiver.get(RepeaterTest.OUTPUT_TAG).output((Integer) backOff.setThrowableFunction(callerImpl::call).build().apply(num));
            } catch (UserCodeExecutionException e) {
                multiOutputReceiver.get(RepeaterTest.FAILURE_TAG).output(RepeaterTest.getRepeatableErrorTypeName(e.getClass()));
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/io/requestresponse/RepeaterTest$NoOpSleeper.class */
    private static class NoOpSleeper implements Sleeper {
        private NoOpSleeper() {
        }

        public void sleep(long j) throws InterruptedException {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/io/requestresponse/RepeaterTest$SetupTeardownImpl.class */
    public static class SetupTeardownImpl implements SetupTeardown {
        private int wantNumErrors;
        private final Class<? extends UserCodeExecutionException> wantThrowWith;
        private final String exceptionName;

        private SetupTeardownImpl(int i) {
            this(i, (Class<? extends UserCodeExecutionException>) UserCodeExecutionException.class);
        }

        private SetupTeardownImpl(int i, Class<? extends UserCodeExecutionException> cls) {
            this.wantNumErrors = i;
            this.wantThrowWith = cls;
            this.exceptionName = RepeaterTest.getRepeatableErrorTypeName(cls);
        }

        public void setup() throws UserCodeExecutionException {
            if (this.wantNumErrors > 0) {
                this.wantNumErrors--;
                try {
                    throw this.wantThrowWith.getConstructor(String.class).newInstance(this.exceptionName);
                } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        public void teardown() throws UserCodeExecutionException {
        }
    }

    @Test
    public void givenCallerQuotaErrorsExceedsLimit_emitsIntoFailurePCollection() {
        PCollectionTuple apply = this.pipeline.apply(Create.of(1, new Integer[0])).apply(ParDo.of(new DoFnWithRepeaters(new CallerImpl(4, UserCodeQuotaException.class), new SetupTeardownImpl(0))).withOutputTags(OUTPUT_TAG, TupleTagList.of(FAILURE_TAG)));
        PAssert.that(apply.get(OUTPUT_TAG)).empty();
        PAssert.that(apply.get(FAILURE_TAG)).containsInAnyOrder(new String[]{UserCodeQuotaException.class.getName()});
        this.pipeline.run();
    }

    @Test
    public void givenSetupQuotaErrorsExceedsLimit_throws() {
        this.pipeline.apply(Create.of(1, new Integer[0])).apply(ParDo.of(new DoFnWithRepeaters(new CallerImpl(0), new SetupTeardownImpl(4, UserCodeQuotaException.class))).withOutputTags(OUTPUT_TAG, TupleTagList.of(FAILURE_TAG)));
        TestPipeline testPipeline = this.pipeline;
        Objects.requireNonNull(testPipeline);
        UncheckedExecutionException assertThrows = Assert.assertThrows(UncheckedExecutionException.class, testPipeline::run);
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.allOf(Matchers.notNullValue(), Matchers.instanceOf(UserCodeException.class)));
        MatcherAssert.assertThat(assertThrows.getCause().getCause(), Matchers.allOf(Matchers.notNullValue(), Matchers.instanceOf(UserCodeQuotaException.class)));
    }

    @Test
    public void givenCallerTimeoutErrorsExceedsLimit_emitsIntoFailurePCollection() {
        PCollectionTuple apply = this.pipeline.apply(Create.of(1, new Integer[0])).apply(ParDo.of(new DoFnWithRepeaters(new CallerImpl(4, UserCodeTimeoutException.class), new SetupTeardownImpl(0))).withOutputTags(OUTPUT_TAG, TupleTagList.of(FAILURE_TAG)));
        PAssert.that(apply.get(OUTPUT_TAG)).empty();
        PAssert.that(apply.get(FAILURE_TAG)).containsInAnyOrder(new String[]{UserCodeTimeoutException.class.getName()});
        this.pipeline.run();
    }

    @Test
    public void givenSetupTimeoutErrorsExceedsLimit_throws() {
        this.pipeline.apply(Create.of(1, new Integer[0])).apply(ParDo.of(new DoFnWithRepeaters(new CallerImpl(0), new SetupTeardownImpl(4, UserCodeTimeoutException.class))).withOutputTags(OUTPUT_TAG, TupleTagList.of(FAILURE_TAG)));
        TestPipeline testPipeline = this.pipeline;
        Objects.requireNonNull(testPipeline);
        UncheckedExecutionException assertThrows = Assert.assertThrows(UncheckedExecutionException.class, testPipeline::run);
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.allOf(Matchers.notNullValue(), Matchers.instanceOf(UserCodeException.class)));
        MatcherAssert.assertThat(assertThrows.getCause().getCause(), Matchers.allOf(Matchers.notNullValue(), Matchers.instanceOf(UserCodeTimeoutException.class)));
    }

    @Test
    public void givenCallerRemoteSystemExceptionExceedsLimit_emitsIntoFailurePCollection() {
        PCollectionTuple apply = this.pipeline.apply(Create.of(1, new Integer[0])).apply(ParDo.of(new DoFnWithRepeaters(new CallerImpl(4, UserCodeRemoteSystemException.class), new SetupTeardownImpl(0))).withOutputTags(OUTPUT_TAG, TupleTagList.of(FAILURE_TAG)));
        PAssert.that(apply.get(OUTPUT_TAG)).empty();
        PAssert.that(apply.get(FAILURE_TAG)).containsInAnyOrder(new String[]{UserCodeRemoteSystemException.class.getName()});
        this.pipeline.run();
    }

    @Test
    public void givenSetupRemoteSystemErrorsExceedsLimit_throws() {
        this.pipeline.apply(Create.of(1, new Integer[0])).apply(ParDo.of(new DoFnWithRepeaters(new CallerImpl(0), new SetupTeardownImpl(4, UserCodeRemoteSystemException.class))).withOutputTags(OUTPUT_TAG, TupleTagList.of(FAILURE_TAG)));
        TestPipeline testPipeline = this.pipeline;
        Objects.requireNonNull(testPipeline);
        UncheckedExecutionException assertThrows = Assert.assertThrows(UncheckedExecutionException.class, testPipeline::run);
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.allOf(Matchers.notNullValue(), Matchers.instanceOf(UserCodeException.class)));
        MatcherAssert.assertThat(assertThrows.getCause().getCause(), Matchers.allOf(Matchers.notNullValue(), Matchers.instanceOf(UserCodeRemoteSystemException.class)));
    }

    @Test
    public void givenCallerNonRepeatableError_emitsIntoFailurePCollection() {
        PCollectionTuple apply = this.pipeline.apply(Create.of(1, new Integer[0])).apply(ParDo.of(new DoFnWithRepeaters(new CallerImpl(1, UserCodeExecutionException.class), new SetupTeardownImpl(0))).withOutputTags(OUTPUT_TAG, TupleTagList.of(FAILURE_TAG)));
        PAssert.that(apply.get(OUTPUT_TAG)).empty();
        PAssert.that(apply.get(FAILURE_TAG)).containsInAnyOrder(new String[]{UserCodeExecutionException.class.getName()});
        this.pipeline.run();
    }

    @Test
    public void givenSetupNonRepeatableError_throws() {
        this.pipeline.apply(Create.of(1, new Integer[0])).apply(ParDo.of(new DoFnWithRepeaters(new CallerImpl(0), new SetupTeardownImpl(1, UserCodeExecutionException.class))).withOutputTags(OUTPUT_TAG, TupleTagList.of(FAILURE_TAG)));
        TestPipeline testPipeline = this.pipeline;
        Objects.requireNonNull(testPipeline);
        UncheckedExecutionException assertThrows = Assert.assertThrows(UncheckedExecutionException.class, testPipeline::run);
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.allOf(Matchers.notNullValue(), Matchers.instanceOf(UserCodeException.class)));
        MatcherAssert.assertThat(assertThrows.getCause().getCause(), Matchers.allOf(Matchers.notNullValue(), Matchers.instanceOf(UserCodeExecutionException.class)));
    }

    @Test
    public void givenRepeatableErrorBelowLimit_emitsIntoOutputPCollection() {
        PCollectionTuple apply = this.pipeline.apply(Create.of(1, new Integer[0])).apply(ParDo.of(new DoFnWithRepeaters(new CallerImpl(2, UserCodeQuotaException.class), new SetupTeardownImpl(0))).withOutputTags(OUTPUT_TAG, TupleTagList.of(FAILURE_TAG)));
        PAssert.that(apply.get(OUTPUT_TAG)).containsInAnyOrder(new Integer[]{2});
        PAssert.that(apply.get(FAILURE_TAG)).empty();
        this.pipeline.run();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String getRepeatableErrorTypeName(Class<? extends UserCodeExecutionException> cls) {
        for (Class cls2 : Repeater.REPEATABLE_ERROR_TYPES) {
            if (cls2.equals(cls)) {
                return cls2.getName();
            }
        }
        return cls.getName();
    }
}
