package org.apache.flink.streaming.examples.kafka;

import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;

/* loaded from: input_file:org/apache/flink/streaming/examples/kafka/Kafka010Example.class */
public class Kafka010Example {

    /* loaded from: input_file:org/apache/flink/streaming/examples/kafka/Kafka010Example$CustomWatermarkExtractor.class */
    private static class CustomWatermarkExtractor implements AssignerWithPeriodicWatermarks<KafkaEvent> {
        private static final long serialVersionUID = -742759155861320823L;
        private long currentTimestamp;

        private CustomWatermarkExtractor() {
            this.currentTimestamp = Long.MIN_VALUE;
        }

        public long extractTimestamp(KafkaEvent kafkaEvent, long j) {
            this.currentTimestamp = kafkaEvent.getTimestamp();
            return kafkaEvent.getTimestamp();
        }

        @Nullable
        public Watermark getCurrentWatermark() {
            return new Watermark(this.currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : this.currentTimestamp - 1);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/examples/kafka/Kafka010Example$RollingAdditionMapper.class */
    private static class RollingAdditionMapper extends RichMapFunction<KafkaEvent, KafkaEvent> {
        private static final long serialVersionUID = 1180234853172462378L;
        private transient ValueState<Integer> currentTotalCount;

        private RollingAdditionMapper() {
        }

        public KafkaEvent map(KafkaEvent kafkaEvent) throws Exception {
            Integer num = (Integer) this.currentTotalCount.value();
            if (num == null) {
                num = 0;
            }
            Integer valueOf = Integer.valueOf(num.intValue() + kafkaEvent.getFrequency());
            this.currentTotalCount.update(valueOf);
            return new KafkaEvent(kafkaEvent.getWord(), valueOf.intValue(), kafkaEvent.getTimestamp());
        }

        public void open(Configuration configuration) throws Exception {
            this.currentTotalCount = getRuntimeContext().getState(new ValueStateDescriptor("currentTotalCount", Integer.class));
        }
    }

    public static void main(String[] strArr) throws Exception {
        ParameterTool fromArgs = ParameterTool.fromArgs(strArr);
        if (fromArgs.getNumberOfParameters() < 5) {
            System.out.println("Missing parameters!\nUsage: Kafka --input-topic <topic> --output-topic <topic> --bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>");
            return;
        }
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableSysoutLogging();
        executionEnvironment.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000L));
        executionEnvironment.enableCheckpointing(5000L);
        executionEnvironment.getConfig().setGlobalJobParameters(fromArgs);
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        executionEnvironment.addSource(new FlinkKafkaConsumer010(fromArgs.getRequired("input-topic"), new KafkaEventSchema(), fromArgs.getProperties()).assignTimestampsAndWatermarks(new CustomWatermarkExtractor())).keyBy(new String[]{"word"}).map(new RollingAdditionMapper()).addSink(new FlinkKafkaProducer010(fromArgs.getRequired("output-topic"), new KafkaEventSchema(), fromArgs.getProperties()));
        executionEnvironment.execute("Kafka 0.10 Example");
    }
}
