package org.apache.beam.runners.flink.examples;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringDelegateCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/examples/TFIDF.class */
public class TFIDF {

    /* loaded from: input_file:org/apache/beam/runners/flink/examples/TFIDF$ComputeTfIdf.class */
    public static class ComputeTfIdf extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> {
        private static final long serialVersionUID = 0;
        private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class);

        public PCollection<KV<String, KV<URI, Double>>> apply(PCollection<KV<URI, String>> pCollection) {
            final PCollectionView apply = pCollection.apply("GetURIs", Keys.create()).apply("RemoveDuplicateDocs", RemoveDuplicates.create()).apply(Count.globally()).apply(View.asSingleton());
            PCollection apply2 = pCollection.apply("SplitWords", ParDo.of(new DoFn<KV<URI, String>, KV<URI, String>>() { // from class: org.apache.beam.runners.flink.examples.TFIDF.ComputeTfIdf.1
                private static final long serialVersionUID = 0;

                public void processElement(DoFn<KV<URI, String>, KV<URI, String>>.ProcessContext processContext) {
                    URI uri = (URI) ((KV) processContext.element()).getKey();
                    for (String str : ((String) ((KV) processContext.element()).getValue()).split("\\W+")) {
                        if (str.toLowerCase().equals("love")) {
                            ComputeTfIdf.LOG.info("Found {}", str.toLowerCase());
                        }
                        if (!str.isEmpty()) {
                            processContext.output(KV.of(uri, str.toLowerCase()));
                        }
                    }
                }
            }));
            PCollection apply3 = apply2.apply("RemoveDuplicateWords", RemoveDuplicates.create()).apply(Values.create()).apply("CountDocs", Count.perElement());
            PCollection apply4 = apply2.apply("GetURIs2", Keys.create()).apply("CountWords", Count.perElement());
            PCollection apply5 = apply2.apply("CountWordDocPairs", Count.perElement()).apply("ShiftKeys", ParDo.of(new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { // from class: org.apache.beam.runners.flink.examples.TFIDF.ComputeTfIdf.2
                private static final long serialVersionUID = 0;

                public void processElement(DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>.ProcessContext processContext) {
                    processContext.output(KV.of((URI) ((KV) ((KV) processContext.element()).getKey()).getKey(), KV.of((String) ((KV) ((KV) processContext.element()).getKey()).getValue(), (Long) ((KV) processContext.element()).getValue())));
                }
            }));
            final TupleTag tupleTag = new TupleTag();
            final TupleTag tupleTag2 = new TupleTag();
            PCollection apply6 = KeyedPCollectionTuple.of(tupleTag, apply4).and(tupleTag2, apply5).apply("CoGroupByUri", CoGroupByKey.create()).apply("ComputeTermFrequencies", ParDo.of(new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { // from class: org.apache.beam.runners.flink.examples.TFIDF.ComputeTfIdf.3
                private static final long serialVersionUID = 0;

                public void processElement(DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>.ProcessContext processContext) {
                    URI uri = (URI) ((KV) processContext.element()).getKey();
                    Long l = (Long) ((CoGbkResult) ((KV) processContext.element()).getValue()).getOnly(tupleTag);
                    for (KV kv : ((CoGbkResult) ((KV) processContext.element()).getValue()).getAll(tupleTag2)) {
                        processContext.output(KV.of((String) kv.getKey(), KV.of(uri, Double.valueOf(((Long) kv.getValue()).doubleValue() / l.doubleValue()))));
                    }
                }
            }));
            PCollection apply7 = apply3.apply("ComputeDocFrequencies", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<KV<String, Long>, KV<String, Double>>() { // from class: org.apache.beam.runners.flink.examples.TFIDF.ComputeTfIdf.4
                private static final long serialVersionUID = 0;

                public void processElement(DoFn<KV<String, Long>, KV<String, Double>>.ProcessContext processContext) {
                    processContext.output(KV.of((String) ((KV) processContext.element()).getKey(), Double.valueOf(((Long) ((KV) processContext.element()).getValue()).doubleValue() / ((Long) processContext.sideInput(apply)).doubleValue())));
                }
            }));
            final TupleTag tupleTag3 = new TupleTag();
            final TupleTag tupleTag4 = new TupleTag();
            return KeyedPCollectionTuple.of(tupleTag3, apply6).and(tupleTag4, apply7).apply(CoGroupByKey.create()).apply("ComputeTfIdf", ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { // from class: org.apache.beam.runners.flink.examples.TFIDF.ComputeTfIdf.5
                private static final long serialVersionUID = 0;

                public void processElement(DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>.ProcessContext processContext) {
                    String str = (String) ((KV) processContext.element()).getKey();
                    Double d = (Double) ((CoGbkResult) ((KV) processContext.element()).getValue()).getOnly(tupleTag4);
                    for (KV kv : ((CoGbkResult) ((KV) processContext.element()).getValue()).getAll(tupleTag3)) {
                        processContext.output(KV.of(str, KV.of((URI) kv.getKey(), Double.valueOf(((Double) kv.getValue()).doubleValue() * Math.log(1.0d / d.doubleValue())))));
                    }
                }
            }));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/examples/TFIDF$Options.class */
    public interface Options extends PipelineOptions, FlinkPipelineOptions {
        @Default.String("gs://dataflow-samples/shakespeare/")
        @Description("Path to the directory or GCS prefix containing files to read from")
        String getInput();

        void setInput(String str);

        @Description("Prefix of output URI to write to")
        @Validation.Required
        String getOutput();

        void setOutput(String str);
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/examples/TFIDF$ReadDocuments.class */
    public static class ReadDocuments extends PTransform<PInput, PCollection<KV<URI, String>>> {
        private static final long serialVersionUID = 0;
        private Iterable<URI> uris;

        public ReadDocuments(Iterable<URI> iterable) {
            this.uris = iterable;
        }

        public Coder<?> getDefaultOutputCoder() {
            return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of());
        }

        /* renamed from: apply, reason: merged with bridge method [inline-methods] */
        public PCollection<KV<URI, String>> m1apply(PInput pInput) {
            Pipeline pipeline = pInput.getPipeline();
            PCollectionList empty = PCollectionList.empty(pipeline);
            for (URI uri : this.uris) {
                String path = uri.getScheme().equals("file") ? new File(uri).getPath() : uri.toString();
                empty = empty.and(pipeline.apply("TextIO.Read(" + path + ")", TextIO.Read.from(path)).apply("WithKeys(" + path + ")", WithKeys.of(uri)));
            }
            return empty.apply(Flatten.pCollections());
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/examples/TFIDF$WriteTfIdf.class */
    public static class WriteTfIdf extends PTransform<PCollection<KV<String, KV<URI, Double>>>, PDone> {
        private static final long serialVersionUID = 0;
        private String output;

        public WriteTfIdf(String str) {
            this.output = str;
        }

        public PDone apply(PCollection<KV<String, KV<URI, Double>>> pCollection) {
            return pCollection.apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() { // from class: org.apache.beam.runners.flink.examples.TFIDF.WriteTfIdf.1
                private static final long serialVersionUID = 0;

                public void processElement(DoFn<KV<String, KV<URI, Double>>, String>.ProcessContext processContext) {
                    processContext.output(String.format("%s,\t%s,\t%f", ((KV) processContext.element()).getKey(), ((KV) ((KV) processContext.element()).getValue()).getKey(), ((KV) ((KV) processContext.element()).getValue()).getValue()));
                }
            })).apply(TextIO.Write.to(this.output).withSuffix(".csv"));
        }
    }

    public static Set<URI> listInputDocuments(Options options) throws URISyntaxException, IOException {
        URI uri = new URI(options.getInput());
        URI uri2 = uri.getScheme() != null ? uri : new URI("file", uri.getAuthority(), uri.getPath(), uri.getQuery(), uri.getFragment());
        HashSet hashSet = new HashSet();
        if (uri2.getScheme().equals("file")) {
            File file = new File(uri2);
            for (String str : file.list()) {
                hashSet.add(new File(file, str).toURI());
            }
        } else if (uri2.getScheme().equals("gs")) {
            Iterator it = options.as(GcsOptions.class).getGcsUtil().expand(GcsPath.fromUri(new URI(uri2.getScheme(), uri2.getAuthority(), uri2.getPath() + "*", uri2.getQuery(), uri2.getFragment()))).iterator();
            while (it.hasNext()) {
                hashSet.add(((GcsPath) it.next()).toUri());
            }
        }
        return hashSet;
    }

    public static void main(String[] strArr) throws Exception {
        Options options = (Options) PipelineOptionsFactory.fromArgs(strArr).withValidation().as(Options.class);
        options.setRunner(FlinkRunner.class);
        Pipeline create = Pipeline.create(options);
        create.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
        create.apply(new ReadDocuments(listInputDocuments(options))).apply(new ComputeTfIdf()).apply(new WriteTfIdf(options.getOutput()));
        create.run();
    }
}
