/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.accumulators;

import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.DoubleCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class AccumulatorErrorITCase {
    private static ForkableFlinkMiniCluster cluster;

    @BeforeClass
    public static void startCluster() {
        try {
            Configuration config = new Configuration();
            config.setInteger("local.number-taskmanager", 2);
            config.setInteger("taskmanager.numberOfTaskSlots", 3);
            config.setInteger("taskmanager.memory.size", 12);
            cluster = new ForkableFlinkMiniCluster(config, false);
            cluster.start();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)("Failed to start test cluster: " + e.getMessage()));
        }
    }

    @AfterClass
    public static void shutdownCluster() {
        try {
            cluster.shutdown();
            cluster = null;
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)("Failed to stop test cluster: " + e.getMessage()));
        }
    }

    @Test
    public void testFaultyAccumulator() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
        env.getConfig().disableSysoutLogging();
        DataSource input = env.generateSequence(0L, 10000L);
        MapOperator map = input.map((MapFunction)new FaultyAccumulatorUsingMapper());
        map.output((OutputFormat)new DiscardingOutputFormat());
        try {
            env.execute();
            Assert.fail((String)"Should have failed.");
        }
        catch (ProgramInvocationException e) {
            Assert.assertTrue((String)"Exception should be passed:", (boolean)(e.getCause() instanceof JobExecutionException));
            Assert.assertTrue((String)"Root cause should be:", (boolean)(e.getCause().getCause() instanceof CustomException));
        }
    }

    @Test
    public void testInvalidTypeAccumulator() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getLeaderRPCPort(), (String[])new String[0]);
        env.getConfig().disableSysoutLogging();
        DataSource input = env.generateSequence(0L, 10000L);
        MapOperator mappers = input.map((MapFunction)new IncompatibleAccumulatorTypesMapper()).map((MapFunction)new IncompatibleAccumulatorTypesMapper2());
        mappers.output((OutputFormat)new DiscardingOutputFormat());
        try {
            env.execute();
            Assert.fail((String)"Should have failed.");
        }
        catch (ProgramInvocationException e) {
            Assert.assertTrue((String)"Exception should be passed:", (boolean)(e.getCause() instanceof JobExecutionException));
            Assert.assertTrue((String)"Root cause should be:", (boolean)(e.getCause().getCause() instanceof Exception));
            Assert.assertTrue((String)"Root cause should be:", (boolean)(e.getCause().getCause().getCause() instanceof UnsupportedOperationException));
        }
    }

    private static class IncompatibleAccumulatorTypesMapper2
    extends RichMapFunction<Long, Long> {
        private static final long serialVersionUID = 42L;

        private IncompatibleAccumulatorTypesMapper2() {
        }

        public void open(Configuration parameters) throws Exception {
            this.getRuntimeContext().addAccumulator("test", (Accumulator)new DoubleCounter());
        }

        public Long map(Long value) throws Exception {
            return -1L;
        }
    }

    private static class IncompatibleAccumulatorTypesMapper
    extends RichMapFunction<Long, Long> {
        private static final long serialVersionUID = 42L;

        private IncompatibleAccumulatorTypesMapper() {
        }

        public void open(Configuration parameters) throws Exception {
            this.getRuntimeContext().addAccumulator("test", (Accumulator)new LongCounter());
        }

        public Long map(Long value) throws Exception {
            return -1L;
        }
    }

    private static class CustomException
    extends RuntimeException {
        private static final long serialVersionUID = 42L;

        private CustomException() {
        }
    }

    private static class FaultyAccumulator
    extends LongCounter {
        private static final long serialVersionUID = 42L;

        private FaultyAccumulator() {
        }

        public LongCounter clone() {
            throw new CustomException();
        }
    }

    private static class FaultyAccumulatorUsingMapper
    extends RichMapFunction<Long, Long> {
        private static final long serialVersionUID = 42L;

        private FaultyAccumulatorUsingMapper() {
        }

        public void open(Configuration parameters) throws Exception {
            this.getRuntimeContext().addAccumulator("test", (Accumulator)new FaultyAccumulator());
        }

        public Long map(Long value) throws Exception {
            return -1L;
        }
    }
}

