package org.apache.beam.runners.flink.examples.streaming;

import java.util.Iterator;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.runners.flink.examples.streaming.WindowedWordCount;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/beam/runners/flink/examples/streaming/JoinExamples.class */
public class JoinExamples {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/flink/examples/streaming/JoinExamples$ExtractEventDataFn.class */
    public static class ExtractEventDataFn extends DoFn<String, KV<String, String>> {
        private static final long serialVersionUID = 0;

        ExtractEventDataFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<String, KV<String, String>>.ProcessContext processContext) {
            String lowerCase = ((String) processContext.element()).toLowerCase();
            processContext.output(KV.of(lowerCase.split("\\s")[0], lowerCase));
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/examples/streaming/JoinExamples$Options.class */
    private interface Options extends WindowedWordCount.StreamingWordCountOptions {
    }

    static PCollection<String> joinEvents(PCollection<String> pCollection, PCollection<String> pCollection2) throws Exception {
        final TupleTag tupleTag = new TupleTag();
        final TupleTag tupleTag2 = new TupleTag();
        PCollection apply = pCollection.apply(ParDo.of(new ExtractEventDataFn()));
        return KeyedPCollectionTuple.of(tupleTag, apply).and(tupleTag2, pCollection2.apply(ParDo.of(new ExtractEventDataFn()))).apply(CoGroupByKey.create()).apply("Process", ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { // from class: org.apache.beam.runners.flink.examples.streaming.JoinExamples.1
            private static final long serialVersionUID = 0;

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, CoGbkResult>, KV<String, String>>.ProcessContext processContext) {
                KV kv = (KV) processContext.element();
                String str = (String) kv.getKey();
                String str2 = (String) ((CoGbkResult) kv.getValue()).getOnly(tupleTag, "NO_VALUE");
                Iterator it = ((CoGbkResult) ((KV) processContext.element()).getValue()).getAll(tupleTag2).iterator();
                while (it.hasNext()) {
                    processContext.output(KV.of(str, "Value A: " + str2 + " - Value B: " + ((String) it.next())));
                }
            }
        })).apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() { // from class: org.apache.beam.runners.flink.examples.streaming.JoinExamples.2
            private static final long serialVersionUID = 0;

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, String>, String>.ProcessContext processContext) {
                String str = ((String) ((KV) processContext.element()).getKey()) + " -> " + ((String) ((KV) processContext.element()).getValue());
                System.out.println(str);
                processContext.output(str);
            }
        }));
    }

    public static void main(String[] strArr) throws Exception {
        Options options = (Options) PipelineOptionsFactory.fromArgs(strArr).withValidation().as(Options.class);
        options.setStreaming(true);
        options.setCheckpointingInterval(1000L);
        options.setNumberOfExecutionRetries(5);
        options.setExecutionRetryDelay(3000L);
        options.setRunner(FlinkRunner.class);
        FixedWindows of = FixedWindows.of(Duration.standardSeconds(options.getWindowSize().longValue()));
        Pipeline create = Pipeline.create(options);
        joinEvents(create.apply("FirstStream", Read.from(new UnboundedSocketSource("localhost", 9999, '\n', 3L))).apply(Window.into(of).triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO).discardingFiredPanes()), create.apply("SecondStream", Read.from(new UnboundedSocketSource("localhost", 9998, '\n', 3L))).apply(Window.into(of).triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO).discardingFiredPanes())).apply(TextIO.Write.to("./outputJoin.txt"));
        create.run();
    }
}
