/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recordJobs.graph;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.Program;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.java.record.functions.CoGroupFunction;
import org.apache.flink.api.java.record.functions.FunctionAnnotation;
import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.api.java.record.io.DelimitedInputFormat;
import org.apache.flink.api.java.record.io.FileOutputFormat;
import org.apache.flink.api.java.record.operators.CoGroupOperator;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.JoinOperator;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;

public class PairwiseSP
implements Program,
ProgramDescription {
    private static final long serialVersionUID = 1L;

    public Plan getPlan(String ... args) {
        int numSubTasks = args.length > 0 ? Integer.parseInt(args[0]) : 1;
        String paths = args.length > 1 ? args[1] : "";
        String output = args.length > 2 ? args[2] : "";
        boolean rdfInput = args.length > 3 && Boolean.parseBoolean(args[3]);
        FileDataSource pathsInput = rdfInput ? new FileDataSource((FileInputFormat)new RDFTripleInFormat(), paths, "RDF Triples") : new FileDataSource((FileInputFormat)new PathInFormat(), paths, "Paths");
        pathsInput.setParallelism(numSubTasks);
        JoinOperator concatPaths = JoinOperator.builder((JoinFunction)new ConcatPaths(), StringValue.class, (int)0, (int)1).name("Concat Paths").build();
        concatPaths.setParallelism(numSubTasks);
        CoGroupOperator findShortestPaths = CoGroupOperator.builder((CoGroupFunction)new FindShortestPath(), StringValue.class, (int)0, (int)0).keyField(StringValue.class, 1, 1).name("Find Shortest Paths").build();
        findShortestPaths.setParallelism(numSubTasks);
        FileDataSink result = new FileDataSink((org.apache.flink.api.common.io.FileOutputFormat)new PathOutFormat(), output, "New Paths");
        result.setParallelism(numSubTasks);
        result.setInput((Operator)findShortestPaths);
        findShortestPaths.setFirstInput((Operator)pathsInput);
        findShortestPaths.setSecondInput((Operator)concatPaths);
        concatPaths.setFirstInput((Operator)pathsInput);
        concatPaths.setSecondInput((Operator)pathsInput);
        return new Plan((GenericDataSinkBase)result, "Pairwise Shortest Paths");
    }

    public String getDescription() {
        return "Parameters: [numSubStasks], [inputPaths], [outputPaths], [RDFInputFlag]";
    }

    @FunctionAnnotation.ConstantFieldsFirst(value={0, 1})
    @FunctionAnnotation.ConstantFieldsSecond(value={0, 1})
    public static class FindShortestPath
    extends CoGroupFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final Record outputRecord = new Record();
        private final Set<StringValue> shortestPaths = new HashSet<StringValue>();
        private final Map<StringValue, IntValue> hopCnts = new HashMap<StringValue, IntValue>();
        private final IntValue minLength = new IntValue();

        public void coGroup(Iterator<Record> inputRecords, Iterator<Record> concatRecords, Collector<Record> out) {
            IntValue length;
            Record pathRec = null;
            StringValue path = null;
            pathRec = inputRecords.hasNext() ? inputRecords.next() : concatRecords.next();
            StringValue fromNode = (StringValue)pathRec.getField(0, StringValue.class);
            StringValue toNode = (StringValue)pathRec.getField(1, StringValue.class);
            this.minLength.setValue(((IntValue)pathRec.getField(2, IntValue.class)).getValue());
            path = new StringValue((StringValue)pathRec.getField(4, StringValue.class));
            this.shortestPaths.add(path);
            this.hopCnts.put(path, new IntValue(((IntValue)pathRec.getField(3, IntValue.class)).getValue()));
            while (inputRecords.hasNext()) {
                pathRec = inputRecords.next();
                length = (IntValue)pathRec.getField(2, IntValue.class);
                if (length.getValue() == this.minLength.getValue()) {
                    path = new StringValue((StringValue)pathRec.getField(4, StringValue.class));
                    if (!this.shortestPaths.add(path)) continue;
                    this.hopCnts.put(path, new IntValue(((IntValue)pathRec.getField(3, IntValue.class)).getValue()));
                    continue;
                }
                if (length.getValue() >= this.minLength.getValue()) continue;
                this.minLength.setValue(length.getValue());
                this.hopCnts.clear();
                this.shortestPaths.clear();
                path = new StringValue((StringValue)pathRec.getField(4, StringValue.class));
                this.shortestPaths.add(path);
                this.hopCnts.put(path, new IntValue(((IntValue)pathRec.getField(3, IntValue.class)).getValue()));
            }
            while (concatRecords.hasNext()) {
                pathRec = concatRecords.next();
                length = (IntValue)pathRec.getField(2, IntValue.class);
                if (length.getValue() == this.minLength.getValue()) {
                    path = new StringValue((StringValue)pathRec.getField(4, StringValue.class));
                    if (!this.shortestPaths.add(path)) continue;
                    this.hopCnts.put(path, new IntValue(((IntValue)pathRec.getField(3, IntValue.class)).getValue()));
                    continue;
                }
                if (length.getValue() >= this.minLength.getValue()) continue;
                this.minLength.setValue(length.getValue());
                this.hopCnts.clear();
                this.shortestPaths.clear();
                path = new StringValue((StringValue)pathRec.getField(4, StringValue.class));
                this.shortestPaths.add(path);
                this.hopCnts.put(path, new IntValue(((IntValue)pathRec.getField(3, IntValue.class)).getValue()));
            }
            this.outputRecord.setField(0, (Value)fromNode);
            this.outputRecord.setField(1, (Value)toNode);
            this.outputRecord.setField(2, (Value)this.minLength);
            for (StringValue shortestPath : this.shortestPaths) {
                this.outputRecord.setField(3, (Value)this.hopCnts.get(shortestPath));
                this.outputRecord.setField(4, (Value)shortestPath);
                out.collect((Object)this.outputRecord);
            }
            this.hopCnts.clear();
            this.shortestPaths.clear();
        }
    }

    @FunctionAnnotation.ConstantFieldsFirst(value={1})
    @FunctionAnnotation.ConstantFieldsSecond(value={0})
    public static class ConcatPaths
    extends JoinFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final Record outputRecord = new Record();
        private final IntValue length = new IntValue();
        private final IntValue hopCnt = new IntValue();
        private final StringValue hopList = new StringValue();

        public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception {
            StringValue toNode;
            StringValue fromNode = (StringValue)rec2.getField(0, StringValue.class);
            if (fromNode.equals((Object)(toNode = (StringValue)rec1.getField(1, StringValue.class)))) {
                return;
            }
            this.outputRecord.setField(0, (Value)fromNode);
            this.outputRecord.setField(1, (Value)toNode);
            this.length.setValue(((IntValue)rec1.getField(2, IntValue.class)).getValue() + ((IntValue)rec2.getField(2, IntValue.class)).getValue());
            this.outputRecord.setField(2, (Value)this.length);
            int hops = ((IntValue)rec1.getField(3, IntValue.class)).getValue() + 1 + ((IntValue)rec2.getField(3, IntValue.class)).getValue();
            this.hopCnt.setValue(hops);
            this.outputRecord.setField(3, (Value)this.hopCnt);
            StringBuilder sb = new StringBuilder();
            sb.append(((StringValue)rec2.getField(4, StringValue.class)).getValue());
            sb.append(" ");
            sb.append(((StringValue)rec1.getField(0, StringValue.class)).getValue());
            sb.append(" ");
            sb.append(((StringValue)rec1.getField(4, StringValue.class)).getValue());
            this.hopList.setValue((CharSequence)sb.toString().trim());
            this.outputRecord.setField(4, (Value)this.hopList);
            out.collect((Object)this.outputRecord);
        }
    }

    public static class PathOutFormat
    extends FileOutputFormat {
        private static final long serialVersionUID = 1L;

        public void writeRecord(Record record) throws IOException {
            StringBuilder line = new StringBuilder();
            line.append(((StringValue)record.getField(0, StringValue.class)).toString());
            line.append("|");
            line.append(((StringValue)record.getField(1, StringValue.class)).toString());
            line.append("|");
            line.append(((IntValue)record.getField(2, IntValue.class)).toString());
            line.append("|");
            line.append(((IntValue)record.getField(3, IntValue.class)).toString());
            line.append("|");
            line.append(((StringValue)record.getField(4, StringValue.class)).toString());
            line.append("|");
            line.append("\n");
            this.stream.write(line.toString().getBytes());
        }
    }

    public static class PathInFormat
    extends DelimitedInputFormat {
        private static final long serialVersionUID = 1L;
        private final StringValue fromNode = new StringValue();
        private final StringValue toNode = new StringValue();
        private final IntValue length = new IntValue();
        private final IntValue hopCnt = new IntValue();
        private final StringValue hopList = new StringValue();

        public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
            String lineStr = new String(bytes, offset, numBytes);
            StringTokenizer st = new StringTokenizer(lineStr, "|");
            if (st.countTokens() != 5) {
                return null;
            }
            this.fromNode.setValue((CharSequence)st.nextToken());
            this.toNode.setValue((CharSequence)st.nextToken());
            this.length.setValue(Integer.parseInt(st.nextToken()));
            this.hopCnt.setValue(Integer.parseInt(st.nextToken()));
            this.hopList.setValue((CharSequence)st.nextToken());
            target.setField(0, (Value)this.fromNode);
            target.setField(1, (Value)this.toNode);
            target.setField(2, (Value)this.length);
            target.setField(3, (Value)this.hopCnt);
            target.setField(4, (Value)this.hopList);
            return target;
        }
    }

    public static class RDFTripleInFormat
    extends DelimitedInputFormat {
        private static final long serialVersionUID = 1L;
        private final StringValue fromNode = new StringValue();
        private final StringValue toNode = new StringValue();
        private final IntValue pathLength = new IntValue(1);
        private final IntValue hopCnt = new IntValue(0);
        private final StringValue hopList = new StringValue((CharSequence)" ");

        public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
            String lineStr = new String(bytes, offset, numBytes);
            StringTokenizer st = new StringTokenizer(lineStr = lineStr.replaceAll("\\s+", " ").trim(), " ");
            if (st.countTokens() < 3) {
                return null;
            }
            String rdfSubj = st.nextToken();
            String rdfPred = st.nextToken();
            String rdfObj = st.nextToken();
            if (!rdfPred.equals("<http://xmlns.com/foaf/0.1/knows>")) {
                return null;
            }
            this.fromNode.setValue((CharSequence)rdfSubj);
            this.toNode.setValue((CharSequence)rdfObj);
            target.setField(0, (Value)this.fromNode);
            target.setField(1, (Value)this.toNode);
            target.setField(2, (Value)this.pathLength);
            target.setField(3, (Value)this.hopCnt);
            target.setField(4, (Value)this.hopList);
            return target;
        }
    }
}

