/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.accumulators;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class AccumulatorLiveITCase {
    private static final Logger LOG = LoggerFactory.getLogger(AccumulatorLiveITCase.class);
    private static ActorSystem system;
    private static ActorGateway jobManagerGateway;
    private static ActorRef taskManager;
    private static JobID jobID;
    private static JobGraph jobGraph;
    private static String ACCUMULATOR_NAME;
    private static final int NUM_ITERATIONS = 5;
    private static List<String> inputData;
    private static final FiniteDuration TIMEOUT;

    @Before
    public void before() throws Exception {
        system = AkkaUtils.createLocalActorSystem((Configuration)new Configuration());
        Configuration config = new Configuration();
        config.setInteger("taskmanager.numberOfTaskSlots", 1);
        config.setInteger("local.number-taskmanager", 1);
        config.setString("akka.ask.timeout", TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
        TestingCluster testingCluster = new TestingCluster(config, false, true);
        testingCluster.start();
        jobManagerGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
        taskManager = (ActorRef)testingCluster.getTaskManagersAsJava().get(0);
        for (int i = 0; i < 5; ++i) {
            inputData.add(i, String.valueOf(i + 1));
        }
        NotifyingMapper.finished = false;
    }

    @After
    public void after() throws Exception {
        JavaTestKit.shutdownActorSystem((ActorSystem)system);
        inputData.clear();
    }

    @Test
    public void testBatch() throws Exception {
        BatchPlanExtractor env = new BatchPlanExtractor();
        env.setParallelism(1);
        DataSource input = env.fromCollection(inputData);
        input.flatMap((FlatMapFunction)new NotifyingMapper()).output((OutputFormat)new NotifyingOutputFormat());
        env.execute();
        jobGraph = AccumulatorLiveITCase.getOptimizedPlan(env.plan);
        jobID = jobGraph.getJobID();
        AccumulatorLiveITCase.verifyResults();
    }

    @Test
    public void testStreaming() throws Exception {
        DummyStreamExecutionEnvironment env = new DummyStreamExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource input = env.fromCollection(inputData);
        input.flatMap((FlatMapFunction)new NotifyingMapper()).write((OutputFormat)new NotifyingOutputFormat(), 1000L).disableChaining();
        jobGraph = env.getStreamGraph().getJobGraph();
        jobID = jobGraph.getJobID();
        AccumulatorLiveITCase.verifyResults();
    }

    private static void verifyResults() {
        new JavaTestKit(system){
            {
                int i;
                AkkaActorGateway selfGateway = new AkkaActorGateway(this.getRef(), jobManagerGateway.leaderSessionID());
                jobManagerGateway.tell((Object)new TestingJobManagerMessages.NotifyWhenAccumulatorChange(jobID), (ActorGateway)selfGateway);
                this.expectMsgEquals(TIMEOUT, true);
                jobManagerGateway.tell((Object)new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), (ActorGateway)selfGateway);
                this.expectMsgClass(TIMEOUT, JobManagerMessages.JobSubmitSuccess.class);
                TestingJobManagerMessages.UpdatedAccumulators msg = (TestingJobManagerMessages.UpdatedAccumulators)this.receiveOne((Duration)TIMEOUT);
                Map flinkAccumulators = msg.flinkAccumulators();
                Map userAccumulators = msg.userAccumulators();
                ExecutionAttemptID mapperTaskID = null;
                for (Map.Entry entry : flinkAccumulators.entrySet()) {
                    if (entry.getValue() == null) continue;
                    mapperTaskID = (ExecutionAttemptID)entry.getKey();
                    break;
                }
                ExecutionAttemptID sinkTaskID = null;
                for (ExecutionAttemptID key : flinkAccumulators.keySet()) {
                    if (key == mapperTaskID) continue;
                    sinkTaskID = key;
                    break;
                }
                if (AccumulatorLiveITCase.checkUserAccumulators(0, userAccumulators) && AccumulatorLiveITCase.checkFlinkAccumulators(mapperTaskID, 0, 0, 0, 0, flinkAccumulators)) {
                    LOG.info("Passed initial check for map task.");
                } else {
                    Assert.fail((String)"Wrong accumulator results when map task begins execution.");
                }
                int expectedAccVal = 0;
                for (i = 1; i <= 5; ++i) {
                    expectedAccVal += i;
                    msg = (TestingJobManagerMessages.UpdatedAccumulators)this.receiveOne((Duration)TIMEOUT);
                    flinkAccumulators = msg.flinkAccumulators();
                    userAccumulators = msg.userAccumulators();
                    LOG.info("{}", (Object)flinkAccumulators);
                    LOG.info("{}", (Object)userAccumulators);
                    if (AccumulatorLiveITCase.checkUserAccumulators(expectedAccVal, userAccumulators) && AccumulatorLiveITCase.checkFlinkAccumulators(mapperTaskID, 0, i, 0, i * 4, flinkAccumulators)) {
                        LOG.info("Passed round #" + i);
                        continue;
                    }
                    if (AccumulatorLiveITCase.checkUserAccumulators(expectedAccVal, userAccumulators) && AccumulatorLiveITCase.checkFlinkAccumulators(sinkTaskID, 0, i, 0, i * 4, flinkAccumulators)) {
                        ExecutionAttemptID temp = mapperTaskID;
                        mapperTaskID = sinkTaskID;
                        sinkTaskID = temp;
                        LOG.info("Passed round #" + i);
                        continue;
                    }
                    Assert.fail((String)("Failed in round #" + i));
                }
                msg = (TestingJobManagerMessages.UpdatedAccumulators)this.receiveOne((Duration)TIMEOUT);
                flinkAccumulators = msg.flinkAccumulators();
                userAccumulators = msg.userAccumulators();
                if (AccumulatorLiveITCase.checkUserAccumulators(expectedAccVal, userAccumulators) && AccumulatorLiveITCase.checkFlinkAccumulators(sinkTaskID, 0, 0, 0, 0, flinkAccumulators)) {
                    LOG.info("Passed initial check for sink task.");
                } else {
                    Assert.fail((String)"Wrong accumulator results when sink task begins execution.");
                }
                for (i = 1; i <= 5; ++i) {
                    msg = (TestingJobManagerMessages.UpdatedAccumulators)this.receiveOne((Duration)TIMEOUT);
                    flinkAccumulators = msg.flinkAccumulators();
                    userAccumulators = msg.userAccumulators();
                    LOG.info("{}", (Object)flinkAccumulators);
                    LOG.info("{}", (Object)userAccumulators);
                    if (AccumulatorLiveITCase.checkUserAccumulators(expectedAccVal, userAccumulators) && AccumulatorLiveITCase.checkFlinkAccumulators(sinkTaskID, i, 0, i * 4, 0, flinkAccumulators)) {
                        LOG.info("Passed round #" + i);
                        continue;
                    }
                    Assert.fail((String)("Failed in round #" + i));
                }
                this.expectMsgClass(TIMEOUT, JobManagerMessages.JobResultSuccess.class);
            }
        };
    }

    private static boolean checkUserAccumulators(int expected, Map<String, Accumulator<?, ?>> accumulatorMap) {
        LOG.info("checking user accumulators");
        return accumulatorMap.containsKey(ACCUMULATOR_NAME) && expected == ((IntCounter)accumulatorMap.get(ACCUMULATOR_NAME)).getLocalValue();
    }

    private static boolean checkFlinkAccumulators(ExecutionAttemptID taskKey, int expectedRecordsIn, int expectedRecordsOut, int expectedBytesIn, int expectedBytesOut, Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> accumulatorMap) {
        LOG.info("checking flink accumulators");
        Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> taskMap = accumulatorMap.get(taskKey);
        Assert.assertTrue((accumulatorMap.size() > 0 ? 1 : 0) != 0);
        block6: for (Map.Entry<AccumulatorRegistry.Metric, Accumulator<?, ?>> entry : taskMap.entrySet()) {
            switch (entry.getKey()) {
                case NUM_RECORDS_OUT: {
                    if (((LongCounter)entry.getValue()).getLocalValue() >= (long)expectedRecordsOut) continue block6;
                    return false;
                }
                case NUM_BYTES_OUT: {
                    if (((LongCounter)entry.getValue()).getLocalValue() >= (long)expectedBytesOut) continue block6;
                    return false;
                }
                case NUM_RECORDS_IN: {
                    if (((LongCounter)entry.getValue()).getLocalValue() >= (long)expectedRecordsIn) continue block6;
                    return false;
                }
                case NUM_BYTES_IN: {
                    if (((LongCounter)entry.getValue()).getLocalValue() >= (long)expectedBytesIn) continue block6;
                    return false;
                }
            }
            Assert.fail((String)"Unknown accumulator found.");
        }
        return true;
    }

    public static void notifyTaskManagerOfAccumulatorUpdate() {
        new JavaTestKit(system){
            {
                Timeout timeout = new Timeout(TIMEOUT);
                Future ask = Patterns.ask((ActorRef)taskManager, (Object)new TestingTaskManagerMessages.AccumulatorsChanged(jobID), (Timeout)timeout);
                try {
                    Await.result((Awaitable)ask, (Duration)timeout.duration());
                }
                catch (Exception e) {
                    Assert.fail((String)"Failed to notify task manager of accumulator update.");
                }
            }
        };
    }

    private static JobGraph getOptimizedPlan(Plan plan) {
        Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
        JobGraphGenerator jgg = new JobGraphGenerator();
        OptimizedPlan op = pc.compile(plan);
        return jgg.compileJobGraph(op);
    }

    static {
        ACCUMULATOR_NAME = "test";
        inputData = new ArrayList<String>(5);
        TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
    }

    private static class DummyStreamExecutionEnvironment
    extends StreamExecutionEnvironment {
        private DummyStreamExecutionEnvironment() {
        }

        public JobExecutionResult execute() throws Exception {
            return this.execute("default");
        }

        public JobExecutionResult execute(String jobName) throws Exception {
            throw new RuntimeException("This should not be called.");
        }
    }

    private static class BatchPlanExtractor
    extends LocalEnvironment {
        private Plan plan = null;

        private BatchPlanExtractor() {
        }

        public JobExecutionResult execute(String jobName) throws Exception {
            this.plan = this.createProgramPlan();
            return new JobExecutionResult(new JobID(), -1L, null);
        }
    }

    private static class NotifyingOutputFormat
    implements OutputFormat<Integer> {
        private static final long serialVersionUID = 1L;

        private NotifyingOutputFormat() {
        }

        public void configure(Configuration parameters) {
        }

        public void open(int taskNumber, int numTasks) throws IOException {
            while (!NotifyingMapper.finished) {
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException interruptedException) {}
            }
            AccumulatorLiveITCase.notifyTaskManagerOfAccumulatorUpdate();
        }

        public void writeRecord(Integer record) throws IOException {
            AccumulatorLiveITCase.notifyTaskManagerOfAccumulatorUpdate();
        }

        public void close() throws IOException {
        }
    }

    private static class NotifyingMapper
    extends RichFlatMapFunction<String, Integer> {
        private static final long serialVersionUID = 1L;
        private IntCounter counter = new IntCounter();
        private static boolean finished = false;

        private NotifyingMapper() {
        }

        public void open(Configuration parameters) throws Exception {
            this.getRuntimeContext().addAccumulator(ACCUMULATOR_NAME, (Accumulator)this.counter);
            AccumulatorLiveITCase.notifyTaskManagerOfAccumulatorUpdate();
        }

        public void flatMap(String value, Collector<Integer> out) throws Exception {
            int val = Integer.valueOf(value);
            this.counter.add(val);
            out.collect((Object)val);
            LOG.debug("Emitting value {}.", (Object)value);
            AccumulatorLiveITCase.notifyTaskManagerOfAccumulatorUpdate();
        }

        public void close() throws Exception {
            finished = true;
        }
    }
}

