/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recordJobs.relational;

import java.io.Serializable;
import java.util.Iterator;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.Program;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.java.record.functions.CoGroupFunction;
import org.apache.flink.api.java.record.functions.FunctionAnnotation;
import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.api.java.record.functions.MapFunction;
import org.apache.flink.api.java.record.io.CsvInputFormat;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.operators.CoGroupOperator;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.JoinOperator;
import org.apache.flink.api.java.record.operators.MapOperator;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.Collector;

public class WebLogAnalysis
implements Program,
ProgramDescription {
    private static final long serialVersionUID = 1L;

    public Plan getPlan(String ... args) {
        int numSubTasks = args.length > 0 ? Integer.parseInt(args[0]) : 1;
        String docsInput = args.length > 1 ? args[1] : "";
        String ranksInput = args.length > 2 ? args[2] : "";
        String visitsInput = args.length > 3 ? args[3] : "";
        String output = args.length > 4 ? args[4] : "";
        CsvInputFormat docsFormat = new CsvInputFormat('|', new Class[]{StringValue.class, StringValue.class});
        FileDataSource docs = new FileDataSource((FileInputFormat)docsFormat, docsInput, "Docs Input");
        FileDataSource ranks = new FileDataSource((FileInputFormat)new CsvInputFormat(), ranksInput, "Ranks input");
        ((CsvInputFormat.ConfigBuilder)((CsvInputFormat.ConfigBuilder)((CsvInputFormat.ConfigBuilder)((CsvInputFormat.ConfigBuilder)CsvInputFormat.configureRecordFormat((FileDataSource)ranks).recordDelimiter('\n')).fieldDelimiter('|')).field(StringValue.class, 1)).field(IntValue.class, 0)).field(IntValue.class, 2);
        CsvInputFormat visitsFormat = new CsvInputFormat('|', new Class[]{null, StringValue.class, StringValue.class});
        FileDataSource visits = new FileDataSource((FileInputFormat)visitsFormat, visitsInput, "Visits input:q");
        MapOperator filterDocs = MapOperator.builder((MapFunction)new FilterDocs()).input((Operator)docs).name("Filter Docs").build();
        filterDocs.getCompilerHints().setFilterFactor(0.15f);
        MapOperator filterRanks = MapOperator.builder((MapFunction)new FilterRanks()).input((Operator)ranks).name("Filter Ranks").build();
        filterRanks.getCompilerHints().setFilterFactor(0.25f);
        MapOperator filterVisits = MapOperator.builder((MapFunction)new FilterVisits()).input((Operator)visits).name("Filter Visits").build();
        filterVisits.getCompilerHints().setFilterFactor(0.2f);
        JoinOperator joinDocsRanks = JoinOperator.builder((JoinFunction)new JoinDocRanks(), StringValue.class, (int)0, (int)0).input1((Operator)filterDocs).input2((Operator)filterRanks).name("Join Docs Ranks").build();
        CoGroupOperator antiJoinVisits = CoGroupOperator.builder((CoGroupFunction)new AntiJoinVisits(), StringValue.class, (int)0, (int)0).input1((Operator)joinDocsRanks).input2((Operator)filterVisits).name("Antijoin DocsVisits").build();
        FileDataSink result = new FileDataSink((FileOutputFormat)new CsvOutputFormat(), output, (Operator)antiJoinVisits, "Result");
        result.setParallelism(numSubTasks);
        ((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)CsvOutputFormat.configureRecordFormat((FileDataSink)result).recordDelimiter('\n')).fieldDelimiter('|')).lenient(true)).field(IntValue.class, 1)).field(StringValue.class, 0)).field(IntValue.class, 2);
        Plan p = new Plan((GenericDataSinkBase)result, "Weblog Analysis");
        p.setDefaultParallelism(numSubTasks);
        return p;
    }

    public String getDescription() {
        return "Parameters: [numSubTasks], [docs], [ranks], [visits], [output]";
    }

    @FunctionAnnotation.ConstantFieldsFirstExcept(value={})
    public static class AntiJoinVisits
    extends CoGroupFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;

        public void coGroup(Iterator<Record> ranks, Iterator<Record> visits, Collector<Record> out) {
            if (!visits.hasNext()) {
                while (ranks.hasNext()) {
                    out.collect((Object)ranks.next());
                }
            }
        }
    }

    @FunctionAnnotation.ConstantFieldsSecondExcept(value={})
    public static class JoinDocRanks
    extends JoinFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;

        public void join(Record document, Record rank, Collector<Record> out) throws Exception {
            out.collect((Object)rank);
        }
    }

    @FunctionAnnotation.ConstantFieldsExcept(value={1})
    public static class FilterVisits
    extends MapFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private static final int YEARFILTER = 2010;

        public void map(Record record, Collector<Record> out) throws Exception {
            String dateString = ((StringValue)record.getField(1, StringValue.class)).getValue();
            int year = Integer.parseInt(dateString.substring(0, 4));
            if (year == 2010) {
                record.setNull(1);
                out.collect((Object)record);
            }
        }
    }

    @FunctionAnnotation.ConstantFieldsExcept(value={})
    public static class FilterRanks
    extends MapFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private static final int RANKFILTER = 50;

        public void map(Record record, Collector<Record> out) throws Exception {
            if (((IntValue)record.getField(1, IntValue.class)).getValue() > 50) {
                out.collect((Object)record);
            }
        }
    }

    @FunctionAnnotation.ConstantFieldsExcept(value={1})
    public static class FilterDocs
    extends MapFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private static final String[] KEYWORDS = new String[]{" editors ", " oscillations ", " convection "};

        public void map(Record record, Collector<Record> out) throws Exception {
            String docText = ((StringValue)record.getField(1, StringValue.class)).toString();
            boolean allContained = true;
            for (String kw : KEYWORDS) {
                if (docText.contains(kw)) continue;
                allContained = false;
                break;
            }
            if (allContained) {
                record.setNull(1);
                out.collect((Object)record);
            }
        }
    }
}

