/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.examples.flink;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
import org.apache.pulsar.client.api.ProducerConfiguration;

public class PulsarConsumerSourceWordCount {
    public static void main(String[] args) throws Exception {
        ParameterTool parameterTool = ParameterTool.fromArgs((String[])args);
        if (parameterTool.getNumberOfParameters() < 2) {
            System.out.println("Missing parameters!");
            System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>");
            return;
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart((int)4, (long)10000L));
        env.enableCheckpointing(5000L);
        env.getConfig().setGlobalJobParameters((ExecutionConfig.GlobalJobParameters)parameterTool);
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        String serviceUrl = parameterTool.getRequired("service-url");
        String inputTopic = parameterTool.getRequired("input-topic");
        String subscription = parameterTool.get("subscription", "flink-examples");
        String outputTopic = parameterTool.get("output-topic", null);
        int parallelism = parameterTool.getInt("parallelism", 1);
        System.out.println("Parameters:");
        System.out.println("\tServiceUrl:\t" + serviceUrl);
        System.out.println("\tInputTopic:\t" + inputTopic);
        System.out.println("\tSubscription:\t" + subscription);
        System.out.println("\tOutputTopic:\t" + outputTopic);
        System.out.println("\tParallelism:\t" + parallelism);
        PulsarSourceBuilder builder = PulsarSourceBuilder.builder((DeserializationSchema)new SimpleStringSchema()).serviceUrl(serviceUrl).topic(inputTopic).subscriptionName(subscription);
        SourceFunction src = builder.build();
        DataStreamSource input = env.addSource(src);
        SingleOutputStreamOperator wc = input.flatMap((FlatMapFunction & Serializable)(line, collector) -> {
            for (String word : line.split("\\s")) {
                collector.collect((Object)new WordWithCount(word, 1L));
            }
        }).returns(WordWithCount.class).keyBy(new String[]{"word"}).timeWindow(Time.seconds((long)5L)).reduce((ReduceFunction & Serializable)(c1, c2) -> new WordWithCount(c1.word, c1.count + c2.count));
        if (null != outputTopic) {
            wc.addSink((SinkFunction)new FlinkPulsarProducer(serviceUrl, outputTopic, (SerializationSchema & Serializable)wordWithCount -> wordWithCount.toString().getBytes(StandardCharsets.UTF_8), new ProducerConfiguration(), (PulsarKeyExtractor & Serializable)wordWithCount -> wordWithCount.word)).setParallelism(parallelism);
        } else {
            wc.print().setParallelism(1);
        }
        env.execute("Pulsar Stream WordCount");
    }

    public static class WordWithCount {
        public String word;
        public long count;

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        public WordWithCount() {
        }

        public String toString() {
            return "PulsarConsumerSourceWordCount.WordWithCount(word=" + this.word + ", count=" + this.count + ")";
        }
    }
}

