/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.async;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncIOExample {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncIOExample.class);
    private static final String EXACTLY_ONCE_MODE = "exactly_once";
    private static final String EVENT_TIME = "EventTime";
    private static final String INGESTION_TIME = "IngestionTime";
    private static final String ORDERED = "ordered";

    private static void printUsage() {
        System.out.println("To customize example, use: AsyncIOExample [--fsStatePath <path to fs state>] [--checkpointMode <exactly_once or at_least_once>] [--maxCount <max number of input from source, -1 for infinite input>] [--sleepFactor <interval to sleep for each stream element>] [--failRatio <possibility to throw exception>] [--waitMode <ordered or unordered>] [--waitOperatorParallelism <parallelism for async wait operator>] [--eventType <EventTime or IngestionTime>] [--shutdownWaitTS <milli sec to wait for thread pool>][--timeout <Timeout for the asynchronous operations>]");
    }

    public static void main(String[] args) throws Exception {
        long timeout;
        long shutdownWaitTS;
        String timeType;
        int taskNum;
        String mode;
        float failRatio;
        long sleepFactor;
        int maxCount;
        String cpMode;
        String statePath;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool params = ParameterTool.fromArgs((String[])args);
        try {
            statePath = params.get("fsStatePath", null);
            cpMode = params.get("checkpointMode", EXACTLY_ONCE_MODE);
            maxCount = params.getInt("maxCount", 100000);
            sleepFactor = params.getLong("sleepFactor", 100L);
            failRatio = params.getFloat("failRatio", 0.001f);
            mode = params.get("waitMode", ORDERED);
            taskNum = params.getInt("waitOperatorParallelism", 1);
            timeType = params.get("eventType", EVENT_TIME);
            shutdownWaitTS = params.getLong("shutdownWaitTS", 20000L);
            timeout = params.getLong("timeout", 10000L);
        }
        catch (Exception e) {
            AsyncIOExample.printUsage();
            throw e;
        }
        StringBuilder configStringBuilder = new StringBuilder();
        String lineSeparator = System.getProperty("line.separator");
        configStringBuilder.append("Job configuration").append(lineSeparator).append("FS state path=").append(statePath).append(lineSeparator).append("Checkpoint mode=").append(cpMode).append(lineSeparator).append("Max count of input from source=").append(maxCount).append(lineSeparator).append("Sleep factor=").append(sleepFactor).append(lineSeparator).append("Fail ratio=").append(failRatio).append(lineSeparator).append("Waiting mode=").append(mode).append(lineSeparator).append("Parallelism for async wait operator=").append(taskNum).append(lineSeparator).append("Event type=").append(timeType).append(lineSeparator).append("Shutdown wait timestamp=").append(shutdownWaitTS);
        LOG.info(configStringBuilder.toString());
        if (statePath != null) {
            env.setStateBackend((AbstractStateBackend)new FsStateBackend(statePath));
        }
        if (EXACTLY_ONCE_MODE.equals(cpMode)) {
            env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);
        } else {
            env.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
        }
        if (EVENT_TIME.equals(timeType)) {
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        } else if (INGESTION_TIME.equals(timeType)) {
            env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        }
        DataStreamSource inputStream = env.addSource((SourceFunction)new SimpleSource(maxCount));
        SampleAsyncFunction function = new SampleAsyncFunction(sleepFactor, failRatio, shutdownWaitTS);
        SingleOutputStreamOperator result = ORDERED.equals(mode) ? AsyncDataStream.orderedWait((DataStream)inputStream, (AsyncFunction)function, (long)timeout, (TimeUnit)TimeUnit.MILLISECONDS, (int)20).setParallelism(taskNum) : AsyncDataStream.unorderedWait((DataStream)inputStream, (AsyncFunction)function, (long)timeout, (TimeUnit)TimeUnit.MILLISECONDS, (int)20).setParallelism(taskNum);
        result.flatMap((FlatMapFunction)new FlatMapFunction<String, Tuple2<String, Integer>>(){
            private static final long serialVersionUID = -938116068682344455L;

            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                out.collect((Object)new Tuple2((Object)value, (Object)1));
            }
        }).keyBy(new int[]{0}).sum(1).print();
        env.execute("Async IO Example");
    }

    private static class SampleAsyncFunction
    extends RichAsyncFunction<Integer, String> {
        private static final long serialVersionUID = 2098635244857937717L;
        private static ExecutorService executorService;
        private static Random random;
        private int counter;
        private final long sleepFactor;
        private final float failRatio;
        private final long shutdownWaitTS;

        SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS) {
            this.sleepFactor = sleepFactor;
            this.failRatio = failRatio;
            this.shutdownWaitTS = shutdownWaitTS;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            Class<SampleAsyncFunction> clazz = SampleAsyncFunction.class;
            synchronized (SampleAsyncFunction.class) {
                if (this.counter == 0) {
                    executorService = Executors.newFixedThreadPool(30);
                    random = new Random();
                }
                ++this.counter;
                // ** MonitorExit[var2_2] (shouldn't be in output)
                return;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close() throws Exception {
            super.close();
            Class<SampleAsyncFunction> clazz = SampleAsyncFunction.class;
            synchronized (SampleAsyncFunction.class) {
                --this.counter;
                if (this.counter == 0) {
                    executorService.shutdown();
                    try {
                        if (!executorService.awaitTermination(this.shutdownWaitTS, TimeUnit.MILLISECONDS)) {
                            executorService.shutdownNow();
                        }
                    }
                    catch (InterruptedException e) {
                        executorService.shutdownNow();
                    }
                }
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return;
            }
        }

        public void asyncInvoke(final Integer input, final AsyncCollector<String> collector) throws Exception {
            executorService.submit(new Runnable(){

                @Override
                public void run() {
                    long sleep = (long)(random.nextFloat() * (float)SampleAsyncFunction.this.sleepFactor);
                    try {
                        Thread.sleep(sleep);
                        if (random.nextFloat() < SampleAsyncFunction.this.failRatio) {
                            collector.collect((Throwable)new Exception("wahahahaha..."));
                        } else {
                            collector.collect(Collections.singletonList("key-" + input % 10));
                        }
                    }
                    catch (InterruptedException e) {
                        collector.collect(new ArrayList(0));
                    }
                }
            });
        }
    }

    private static class SimpleSource
    implements SourceFunction<Integer>,
    ListCheckpointed<Integer> {
        private static final long serialVersionUID = 1L;
        private volatile boolean isRunning = true;
        private int counter = 0;
        private int start = 0;

        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(this.start);
        }

        public void restoreState(List<Integer> state) throws Exception {
            for (Integer i : state) {
                this.start = i;
            }
        }

        public SimpleSource(int maxNum) {
            this.counter = maxNum;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            while ((this.start < this.counter || this.counter == -1) && this.isRunning) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)this.start);
                    ++this.start;
                    if (this.start == Integer.MAX_VALUE) {
                        this.start = 0;
                    }
                }
                Thread.sleep(10L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }
}

