package org.apache.beam.runners.flink.examples.streaming;

import java.io.IOException;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.runners.flink.examples.WordCount;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.class */
public class WindowedWordCount {
    private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class);
    static final long WINDOW_SIZE = 10;
    static final long SLIDE_SIZE = 5;

    /* loaded from: input_file:org/apache/beam/runners/flink/examples/streaming/WindowedWordCount$ExtractWordsFn.class */
    static class ExtractWordsFn extends DoFn<String, String> {
        private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", new Sum.SumLongFn());

        ExtractWordsFn() {
        }

        public void processElement(DoFn<String, String>.ProcessContext processContext) {
            if (((String) processContext.element()).trim().isEmpty()) {
                this.emptyLines.addValue(1L);
            }
            for (String str : ((String) processContext.element()).split("[^a-zA-Z']+")) {
                if (!str.isEmpty()) {
                    processContext.output(str);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/examples/streaming/WindowedWordCount$FormatAsStringFn.class */
    static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
        FormatAsStringFn() {
        }

        public void processElement(DoFn<KV<String, Long>, String>.ProcessContext processContext) {
            processContext.output(((String) ((KV) processContext.element()).getKey()) + " - " + ((KV) processContext.element()).getValue() + " @ " + processContext.timestamp().toString());
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/examples/streaming/WindowedWordCount$StreamingWordCountOptions.class */
    public interface StreamingWordCountOptions extends WordCount.Options {
        @Description("Sliding window duration, in seconds")
        @Default.Long(WindowedWordCount.WINDOW_SIZE)
        Long getWindowSize();

        void setWindowSize(Long l);

        @Description("Window slide, in seconds")
        @Default.Long(WindowedWordCount.SLIDE_SIZE)
        Long getSlide();

        void setSlide(Long l);
    }

    public static void main(String[] strArr) throws IOException {
        StreamingWordCountOptions streamingWordCountOptions = (StreamingWordCountOptions) PipelineOptionsFactory.fromArgs(strArr).withValidation().as(StreamingWordCountOptions.class);
        streamingWordCountOptions.setStreaming(true);
        streamingWordCountOptions.setWindowSize(Long.valueOf(WINDOW_SIZE));
        streamingWordCountOptions.setSlide(Long.valueOf(SLIDE_SIZE));
        streamingWordCountOptions.setCheckpointingInterval(1000L);
        streamingWordCountOptions.setNumberOfExecutionRetries(5);
        streamingWordCountOptions.setExecutionRetryDelay(3000L);
        streamingWordCountOptions.setRunner(FlinkRunner.class);
        LOG.info("Windpwed WordCount with Sliding Windows of " + streamingWordCountOptions.getWindowSize() + " sec. and a slide of " + streamingWordCountOptions.getSlide());
        Pipeline create = Pipeline.create(streamingWordCountOptions);
        create.apply("StreamingWordCount", Read.from(new UnboundedSocketSource("localhost", 9999, '\n', 3L))).apply(ParDo.of(new ExtractWordsFn())).apply(Window.into(SlidingWindows.of(Duration.standardSeconds(streamingWordCountOptions.getWindowSize().longValue())).every(Duration.standardSeconds(streamingWordCountOptions.getSlide().longValue()))).triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO).discardingFiredPanes()).apply(Count.perElement()).apply(ParDo.of(new FormatAsStringFn())).apply(TextIO.Write.to("./outputWordCount.txt"));
        create.run();
    }
}
