package org.apache.flink.test.accumulators;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorLiveITCase.class */
public class AccumulatorLiveITCase {
    private static ActorSystem system;
    private static ActorGateway jobManagerGateway;
    private static ActorRef taskManager;
    private static JobID jobID;
    private static JobGraph jobGraph;
    private static final Logger LOG = LoggerFactory.getLogger(AccumulatorLiveITCase.class);
    private static String ACCUMULATOR_NAME = "test";
    private static final int NUM_ITERATIONS = 5;
    private static List<String> inputData = new ArrayList(NUM_ITERATIONS);
    private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.test.accumulators.AccumulatorLiveITCase$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorLiveITCase$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$accumulators$AccumulatorRegistry$Metric = new int[AccumulatorRegistry.Metric.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$accumulators$AccumulatorRegistry$Metric[AccumulatorRegistry.Metric.NUM_RECORDS_OUT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$accumulators$AccumulatorRegistry$Metric[AccumulatorRegistry.Metric.NUM_BYTES_OUT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$accumulators$AccumulatorRegistry$Metric[AccumulatorRegistry.Metric.NUM_RECORDS_IN.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$accumulators$AccumulatorRegistry$Metric[AccumulatorRegistry.Metric.NUM_BYTES_IN.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorLiveITCase$BatchPlanExtractor.class */
    private static class BatchPlanExtractor extends LocalEnvironment {
        private Plan plan;

        private BatchPlanExtractor() {
            this.plan = null;
        }

        public JobExecutionResult execute(String str) throws Exception {
            this.plan = createProgramPlan();
            return new JobExecutionResult(new JobID(), -1L, (Map) null);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorLiveITCase$DummyStreamExecutionEnvironment.class */
    private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
        private DummyStreamExecutionEnvironment() {
        }

        public JobExecutionResult execute() throws Exception {
            return execute("default");
        }

        public JobExecutionResult execute(String str) throws Exception {
            throw new RuntimeException("This should not be called.");
        }
    }

    /* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorLiveITCase$NotifyingMapper.class */
    private static class NotifyingMapper extends RichFlatMapFunction<String, Integer> {
        private static final long serialVersionUID = 1;
        private IntCounter counter;
        private static boolean finished = false;

        private NotifyingMapper() {
            this.counter = new IntCounter();
        }

        public void open(Configuration configuration) throws Exception {
            getRuntimeContext().addAccumulator(AccumulatorLiveITCase.ACCUMULATOR_NAME, this.counter);
            AccumulatorLiveITCase.notifyTaskManagerOfAccumulatorUpdate();
        }

        public void flatMap(String str, Collector<Integer> collector) throws Exception {
            int intValue = Integer.valueOf(str).intValue();
            this.counter.add(intValue);
            collector.collect(Integer.valueOf(intValue));
            AccumulatorLiveITCase.LOG.debug("Emitting value {}.", str);
            AccumulatorLiveITCase.notifyTaskManagerOfAccumulatorUpdate();
        }

        public void close() throws Exception {
            finished = true;
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((String) obj, (Collector<Integer>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorLiveITCase$NotifyingOutputFormat.class */
    private static class NotifyingOutputFormat implements OutputFormat<Integer> {
        private static final long serialVersionUID = 1;

        private NotifyingOutputFormat() {
        }

        public void configure(Configuration configuration) {
        }

        public void open(int i, int i2) throws IOException {
            while (!NotifyingMapper.finished) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
            }
            AccumulatorLiveITCase.notifyTaskManagerOfAccumulatorUpdate();
        }

        public void writeRecord(Integer num) throws IOException {
            AccumulatorLiveITCase.notifyTaskManagerOfAccumulatorUpdate();
        }

        public void close() throws IOException {
        }
    }

    @Before
    public void before() throws Exception {
        system = AkkaUtils.createLocalActorSystem(new Configuration());
        Configuration configuration = new Configuration();
        configuration.setInteger("taskmanager.numberOfTaskSlots", 1);
        configuration.setInteger("local.number-taskmanager", 1);
        configuration.setString("akka.ask.timeout", TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
        TestingCluster testingCluster = new TestingCluster(configuration, false, true);
        testingCluster.start();
        jobManagerGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
        taskManager = (ActorRef) testingCluster.getTaskManagersAsJava().get(0);
        for (int i = 0; i < NUM_ITERATIONS; i++) {
            inputData.add(i, String.valueOf(i + 1));
        }
        boolean unused = NotifyingMapper.finished = false;
    }

    @After
    public void after() throws Exception {
        JavaTestKit.shutdownActorSystem(system);
        inputData.clear();
    }

    @Test
    public void testBatch() throws Exception {
        BatchPlanExtractor batchPlanExtractor = new BatchPlanExtractor();
        batchPlanExtractor.setParallelism(1);
        batchPlanExtractor.fromCollection(inputData).flatMap(new NotifyingMapper()).output(new NotifyingOutputFormat());
        batchPlanExtractor.execute();
        jobGraph = getOptimizedPlan(batchPlanExtractor.plan);
        jobID = jobGraph.getJobID();
        verifyResults();
    }

    @Test
    public void testStreaming() throws Exception {
        DummyStreamExecutionEnvironment dummyStreamExecutionEnvironment = new DummyStreamExecutionEnvironment();
        dummyStreamExecutionEnvironment.setParallelism(1);
        dummyStreamExecutionEnvironment.fromCollection(inputData).flatMap(new NotifyingMapper()).writeUsingOutputFormat(new NotifyingOutputFormat()).disableChaining();
        jobGraph = dummyStreamExecutionEnvironment.getStreamGraph().getJobGraph();
        jobID = jobGraph.getJobID();
        verifyResults();
    }

    private static void verifyResults() {
        new JavaTestKit(system) { // from class: org.apache.flink.test.accumulators.AccumulatorLiveITCase.1
            {
                AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getRef(), AccumulatorLiveITCase.jobManagerGateway.leaderSessionID());
                AccumulatorLiveITCase.jobManagerGateway.tell(new TestingJobManagerMessages.NotifyWhenAccumulatorChange(AccumulatorLiveITCase.jobID), akkaActorGateway);
                expectMsgEquals(AccumulatorLiveITCase.TIMEOUT, true);
                AccumulatorLiveITCase.jobManagerGateway.tell(new JobManagerMessages.SubmitJob(AccumulatorLiveITCase.jobGraph, ListeningBehaviour.EXECUTION_RESULT), akkaActorGateway);
                expectMsgClass(AccumulatorLiveITCase.TIMEOUT, JobManagerMessages.JobSubmitSuccess.class);
                TestingJobManagerMessages.UpdatedAccumulators updatedAccumulators = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(AccumulatorLiveITCase.TIMEOUT);
                Map flinkAccumulators = updatedAccumulators.flinkAccumulators();
                Map userAccumulators = updatedAccumulators.userAccumulators();
                ExecutionAttemptID executionAttemptID = null;
                Iterator it = flinkAccumulators.entrySet().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    Map.Entry entry = (Map.Entry) it.next();
                    if (entry.getValue() != null) {
                        executionAttemptID = (ExecutionAttemptID) entry.getKey();
                        break;
                    }
                }
                ExecutionAttemptID executionAttemptID2 = null;
                Iterator it2 = flinkAccumulators.keySet().iterator();
                while (true) {
                    if (!it2.hasNext()) {
                        break;
                    }
                    ExecutionAttemptID executionAttemptID3 = (ExecutionAttemptID) it2.next();
                    if (executionAttemptID3 != executionAttemptID) {
                        executionAttemptID2 = executionAttemptID3;
                        break;
                    }
                }
                if (AccumulatorLiveITCase.checkUserAccumulators(0, userAccumulators) && AccumulatorLiveITCase.checkFlinkAccumulators(executionAttemptID, 0, 0, 0, 0, flinkAccumulators)) {
                    AccumulatorLiveITCase.LOG.info("Passed initial check for map task.");
                } else {
                    Assert.fail("Wrong accumulator results when map task begins execution.");
                }
                int i = 0;
                for (int i2 = 1; i2 <= AccumulatorLiveITCase.NUM_ITERATIONS; i2++) {
                    i += i2;
                    TestingJobManagerMessages.UpdatedAccumulators updatedAccumulators2 = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(AccumulatorLiveITCase.TIMEOUT);
                    Map flinkAccumulators2 = updatedAccumulators2.flinkAccumulators();
                    Map userAccumulators2 = updatedAccumulators2.userAccumulators();
                    AccumulatorLiveITCase.LOG.info("{}", flinkAccumulators2);
                    AccumulatorLiveITCase.LOG.info("{}", userAccumulators2);
                    if (AccumulatorLiveITCase.checkUserAccumulators(i, userAccumulators2) && AccumulatorLiveITCase.checkFlinkAccumulators(executionAttemptID, 0, i2, 0, i2 * 4, flinkAccumulators2)) {
                        AccumulatorLiveITCase.LOG.info("Passed round #" + i2);
                    } else if (AccumulatorLiveITCase.checkUserAccumulators(i, userAccumulators2) && AccumulatorLiveITCase.checkFlinkAccumulators(executionAttemptID2, 0, i2, 0, i2 * 4, flinkAccumulators2)) {
                        ExecutionAttemptID executionAttemptID4 = executionAttemptID;
                        executionAttemptID = executionAttemptID2;
                        executionAttemptID2 = executionAttemptID4;
                        AccumulatorLiveITCase.LOG.info("Passed round #" + i2);
                    } else {
                        Assert.fail("Failed in round #" + i2);
                    }
                }
                TestingJobManagerMessages.UpdatedAccumulators updatedAccumulators3 = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(AccumulatorLiveITCase.TIMEOUT);
                Map flinkAccumulators3 = updatedAccumulators3.flinkAccumulators();
                if (AccumulatorLiveITCase.checkUserAccumulators(i, updatedAccumulators3.userAccumulators()) && AccumulatorLiveITCase.checkFlinkAccumulators(executionAttemptID2, 0, 0, 0, 0, flinkAccumulators3)) {
                    AccumulatorLiveITCase.LOG.info("Passed initial check for sink task.");
                } else {
                    Assert.fail("Wrong accumulator results when sink task begins execution.");
                }
                for (int i3 = 1; i3 <= AccumulatorLiveITCase.NUM_ITERATIONS; i3++) {
                    TestingJobManagerMessages.UpdatedAccumulators updatedAccumulators4 = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(AccumulatorLiveITCase.TIMEOUT);
                    Map flinkAccumulators4 = updatedAccumulators4.flinkAccumulators();
                    Map userAccumulators3 = updatedAccumulators4.userAccumulators();
                    AccumulatorLiveITCase.LOG.info("{}", flinkAccumulators4);
                    AccumulatorLiveITCase.LOG.info("{}", userAccumulators3);
                    if (AccumulatorLiveITCase.checkUserAccumulators(i, userAccumulators3) && AccumulatorLiveITCase.checkFlinkAccumulators(executionAttemptID2, i3, 0, i3 * 4, 0, flinkAccumulators4)) {
                        AccumulatorLiveITCase.LOG.info("Passed round #" + i3);
                    } else {
                        Assert.fail("Failed in round #" + i3);
                    }
                }
                expectMsgClass(AccumulatorLiveITCase.TIMEOUT, JobManagerMessages.JobResultSuccess.class);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean checkUserAccumulators(int i, Map<String, Accumulator<?, ?>> map) {
        LOG.info("checking user accumulators");
        return map.containsKey(ACCUMULATOR_NAME) && i == map.get(ACCUMULATOR_NAME).getLocalValue().intValue();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean checkFlinkAccumulators(ExecutionAttemptID executionAttemptID, int i, int i2, int i3, int i4, Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> map) {
        LOG.info("checking flink accumulators");
        Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> map2 = map.get(executionAttemptID);
        Assert.assertTrue(map.size() > 0);
        for (Map.Entry<AccumulatorRegistry.Metric, Accumulator<?, ?>> entry : map2.entrySet()) {
            switch (AnonymousClass3.$SwitchMap$org$apache$flink$runtime$accumulators$AccumulatorRegistry$Metric[entry.getKey().ordinal()]) {
                case 1:
                    if (entry.getValue().getLocalValue().longValue() < i2) {
                        return false;
                    }
                    break;
                case 2:
                    if (entry.getValue().getLocalValue().longValue() < i4) {
                        return false;
                    }
                    break;
                case 3:
                    if (entry.getValue().getLocalValue().longValue() < i) {
                        return false;
                    }
                    break;
                case 4:
                    if (entry.getValue().getLocalValue().longValue() < i3) {
                        return false;
                    }
                    break;
                default:
                    Assert.fail("Unknown accumulator found.");
                    break;
            }
        }
        return true;
    }

    public static void notifyTaskManagerOfAccumulatorUpdate() {
        new JavaTestKit(system) { // from class: org.apache.flink.test.accumulators.AccumulatorLiveITCase.2
            {
                Timeout timeout = new Timeout(AccumulatorLiveITCase.TIMEOUT);
                try {
                    Await.result(Patterns.ask(AccumulatorLiveITCase.taskManager, new TestingTaskManagerMessages.AccumulatorsChanged(AccumulatorLiveITCase.jobID), timeout), timeout.duration());
                } catch (Exception e) {
                    Assert.fail("Failed to notify task manager of accumulator update.");
                }
            }
        };
    }

    private static JobGraph getOptimizedPlan(Plan plan) {
        return new JobGraphGenerator().compileJobGraph(new Optimizer(new DataStatistics(), new Configuration()).compile(plan));
    }
}
