package org.apache.flink.test.accumulators;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.DoubleCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorErrorITCase.class */
public class AccumulatorErrorITCase extends TestLogger {
    private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone";
    private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge";
    private static final String INCOMPATIBLE_ACCUMULATORS_NAME = "incompatible-accumulators";

    @ClassRule
    public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(2).setNumberSlotsPerTaskManager(3).build());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorErrorITCase$CustomException.class */
    public static class CustomException extends RuntimeException {
        private static final long serialVersionUID = 42;

        private CustomException() {
        }
    }

    /* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorErrorITCase$FaultyAccumulatorUsingMapper.class */
    private static class FaultyAccumulatorUsingMapper extends RichMapFunction<Long, Long> {
        private static final long serialVersionUID = 42;

        private FaultyAccumulatorUsingMapper() {
        }

        public void open(Configuration configuration) throws Exception {
            getRuntimeContext().addAccumulator(AccumulatorErrorITCase.FAULTY_CLONE_ACCUMULATOR, new FaultyCloneAccumulator());
        }

        public Long map(Long l) throws Exception {
            return -1L;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorErrorITCase$FaultyCloneAccumulator.class */
    private static class FaultyCloneAccumulator extends LongCounter {
        private static final long serialVersionUID = 42;

        private FaultyCloneAccumulator() {
        }

        /* renamed from: clone, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public LongCounter m728clone() {
            throw new CustomException();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorErrorITCase$FaultyMergeAccumulator.class */
    private static class FaultyMergeAccumulator extends LongCounter {
        private static final long serialVersionUID = 42;

        private FaultyMergeAccumulator() {
        }

        public void merge(Accumulator<Long, Long> accumulator) {
            throw new CustomException();
        }

        /* renamed from: clone, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public LongCounter m730clone() {
            return new FaultyMergeAccumulator();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorErrorITCase$FaultyMergeAccumulatorUsingMapper.class */
    private static class FaultyMergeAccumulatorUsingMapper extends RichMapFunction<Long, Long> {
        private static final long serialVersionUID = 42;

        private FaultyMergeAccumulatorUsingMapper() {
        }

        public void open(Configuration configuration) throws Exception {
            getRuntimeContext().addAccumulator(AccumulatorErrorITCase.FAULTY_MERGE_ACCUMULATOR, new FaultyMergeAccumulator());
        }

        public Long map(Long l) throws Exception {
            return -1L;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorErrorITCase$IncompatibleAccumulatorTypesMapper.class */
    private static class IncompatibleAccumulatorTypesMapper extends RichMapFunction<Long, Long> {
        private static final long serialVersionUID = 42;

        private IncompatibleAccumulatorTypesMapper() {
        }

        public void open(Configuration configuration) throws Exception {
            getRuntimeContext().addAccumulator(AccumulatorErrorITCase.INCOMPATIBLE_ACCUMULATORS_NAME, new LongCounter());
        }

        public Long map(Long l) throws Exception {
            return -1L;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorErrorITCase$IncompatibleAccumulatorTypesMapper2.class */
    private static class IncompatibleAccumulatorTypesMapper2 extends RichMapFunction<Long, Long> {
        private static final long serialVersionUID = 42;

        private IncompatibleAccumulatorTypesMapper2() {
        }

        public void open(Configuration configuration) throws Exception {
            getRuntimeContext().addAccumulator(AccumulatorErrorITCase.INCOMPATIBLE_ACCUMULATORS_NAME, new DoubleCounter());
        }

        public Long map(Long l) throws Exception {
            return -1L;
        }
    }

    public static Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "12m");
        return configuration;
    }

    @Test
    public void testFaultyAccumulator() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableSysoutLogging();
        executionEnvironment.generateSequence(0L, 10000L).map(new FaultyAccumulatorUsingMapper()).output(new DiscardingOutputFormat());
        assertAccumulatorsShouldFail(executionEnvironment.execute());
    }

    @Test
    public void testInvalidTypeAccumulator() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableSysoutLogging();
        executionEnvironment.generateSequence(0L, 10000L).map(new IncompatibleAccumulatorTypesMapper()).map(new IncompatibleAccumulatorTypesMapper2()).output(new DiscardingOutputFormat());
        try {
            executionEnvironment.execute();
            Assert.fail("Should have failed.");
        } catch (JobExecutionException e) {
            Assert.assertTrue("Root cause should be:", e.getCause() instanceof Exception);
            Assert.assertTrue("Root cause should be:", e.getCause().getCause() instanceof UnsupportedOperationException);
        }
    }

    @Test
    public void testFaultyMergeAccumulator() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableSysoutLogging();
        executionEnvironment.generateSequence(0L, 10000L).map(new FaultyMergeAccumulatorUsingMapper()).output(new DiscardingOutputFormat());
        assertAccumulatorsShouldFail(executionEnvironment.execute());
    }

    private static void assertAccumulatorsShouldFail(JobExecutionResult jobExecutionResult) {
        try {
            jobExecutionResult.getAllAccumulatorResults();
            Assert.fail("Should have failed");
        } catch (Exception e) {
            Assert.assertTrue(ExceptionUtils.findThrowable(e, CustomException.class).isPresent());
        }
    }
}
