package org.apache.flink.streaming.examples.async;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExecutorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/examples/async/AsyncIOExample.class */
public class AsyncIOExample {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncIOExample.class);
    private static final String EXACTLY_ONCE_MODE = "exactly_once";
    private static final String EVENT_TIME = "EventTime";
    private static final String INGESTION_TIME = "IngestionTime";
    private static final String ORDERED = "ordered";

    /* loaded from: input_file:org/apache/flink/streaming/examples/async/AsyncIOExample$SampleAsyncFunction.class */
    private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
        private static final long serialVersionUID = 2098635244857937717L;
        private transient ExecutorService executorService;
        private final long sleepFactor;
        private final float failRatio;
        private final long shutdownWaitTS;

        SampleAsyncFunction(long j, float f, long j2) {
            this.sleepFactor = j;
            this.failRatio = f;
            this.shutdownWaitTS = j2;
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.executorService = Executors.newFixedThreadPool(30);
        }

        public void close() throws Exception {
            super.close();
            ExecutorUtils.gracefulShutdown(this.shutdownWaitTS, TimeUnit.MILLISECONDS, new ExecutorService[]{this.executorService});
        }

        public void asyncInvoke(Integer num, ResultFuture<String> resultFuture) {
            this.executorService.submit(() -> {
                try {
                    Thread.sleep(ThreadLocalRandom.current().nextFloat() * ((float) this.sleepFactor));
                    if (ThreadLocalRandom.current().nextFloat() < this.failRatio) {
                        resultFuture.completeExceptionally(new Exception("wahahahaha..."));
                    } else {
                        resultFuture.complete(Collections.singletonList("key-" + (num.intValue() % 10)));
                    }
                } catch (InterruptedException e) {
                    resultFuture.complete(new ArrayList(0));
                }
            });
        }

        public /* bridge */ /* synthetic */ void asyncInvoke(Object obj, ResultFuture resultFuture) throws Exception {
            asyncInvoke((Integer) obj, (ResultFuture<String>) resultFuture);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/examples/async/AsyncIOExample$SimpleSource.class */
    private static class SimpleSource implements SourceFunction<Integer>, CheckpointedFunction {
        private static final long serialVersionUID = 1;
        private int counter;
        private ListState<Integer> state;
        private volatile boolean isRunning = true;
        private int start = 0;

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.state = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("state", IntSerializer.INSTANCE));
            Iterator it = ((Iterable) this.state.get()).iterator();
            while (it.hasNext()) {
                this.start = ((Integer) it.next()).intValue();
            }
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.state.clear();
            this.state.add(Integer.valueOf(this.start));
        }

        public SimpleSource(int i) {
            this.counter = 0;
            this.counter = i;
        }

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            while (true) {
                if ((this.start >= this.counter && this.counter != -1) || !this.isRunning) {
                    return;
                }
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(Integer.valueOf(this.start));
                    this.start++;
                    if (this.start == Integer.MAX_VALUE) {
                        this.start = 0;
                    }
                }
                Thread.sleep(10L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    private static void printUsage() {
        System.out.println("To customize example, use: AsyncIOExample [--fsStatePath <path to fs state>] [--checkpointMode <exactly_once or at_least_once>] [--maxCount <max number of input from source, -1 for infinite input>] [--sleepFactor <interval to sleep for each stream element>] [--failRatio <possibility to throw exception>] [--waitMode <ordered or unordered>] [--waitOperatorParallelism <parallelism for async wait operator>] [--eventType <EventTime or IngestionTime>] [--shutdownWaitTS <milli sec to wait for thread pool>][--timeout <Timeout for the asynchronous operations>]");
    }

    public static void main(String[] strArr) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool fromArgs = ParameterTool.fromArgs(strArr);
        try {
            String str = fromArgs.get("fsStatePath", (String) null);
            String str2 = fromArgs.get("checkpointMode", EXACTLY_ONCE_MODE);
            int i = fromArgs.getInt("maxCount", 100000);
            long j = fromArgs.getLong("sleepFactor", 100L);
            float f = fromArgs.getFloat("failRatio", 0.001f);
            String str3 = fromArgs.get("waitMode", ORDERED);
            int i2 = fromArgs.getInt("waitOperatorParallelism", 1);
            String str4 = fromArgs.get("eventType", EVENT_TIME);
            long j2 = fromArgs.getLong("shutdownWaitTS", 20000L);
            long j3 = fromArgs.getLong("timeout", 10000L);
            StringBuilder sb = new StringBuilder();
            String property = System.getProperty("line.separator");
            sb.append("Job configuration").append(property).append("FS state path=").append(str).append(property).append("Checkpoint mode=").append(str2).append(property).append("Max count of input from source=").append(i).append(property).append("Sleep factor=").append(j).append(property).append("Fail ratio=").append(f).append(property).append("Waiting mode=").append(str3).append(property).append("Parallelism for async wait operator=").append(i2).append(property).append("Event type=").append(str4).append(property).append("Shutdown wait timestamp=").append(j2);
            LOG.info(sb.toString());
            if (str != null) {
                executionEnvironment.setStateBackend(new FsStateBackend(str));
            }
            if (EXACTLY_ONCE_MODE.equals(str2)) {
                executionEnvironment.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);
            } else {
                executionEnvironment.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
            }
            if (EVENT_TIME.equals(str4)) {
                executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            } else if (INGESTION_TIME.equals(str4)) {
                executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
            }
            DataStreamSource addSource = executionEnvironment.addSource(new SimpleSource(i));
            SampleAsyncFunction sampleAsyncFunction = new SampleAsyncFunction(j, f, j2);
            (ORDERED.equals(str3) ? AsyncDataStream.orderedWait(addSource, sampleAsyncFunction, j3, TimeUnit.MILLISECONDS, 20).setParallelism(i2) : AsyncDataStream.unorderedWait(addSource, sampleAsyncFunction, j3, TimeUnit.MILLISECONDS, 20).setParallelism(i2)).flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { // from class: org.apache.flink.streaming.examples.async.AsyncIOExample.1
                private static final long serialVersionUID = -938116068682344455L;

                public void flatMap(String str5, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    collector.collect(new Tuple2(str5, 1));
                }

                public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                    flatMap((String) obj, (Collector<Tuple2<String, Integer>>) collector);
                }
            }).keyBy(new int[]{0}).sum(1).print();
            executionEnvironment.execute("Async IO Example");
        } catch (Exception e) {
            printUsage();
            throw e;
        }
    }
}
