/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recordJobs.graph;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.Program;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.java.record.functions.FunctionAnnotation;
import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.io.DelimitedInputFormat;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.JoinOperator;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;

public class EnumTrianglesRdfFoaf
implements Program,
ProgramDescription {
    private static final long serialVersionUID = 1L;

    public Plan getPlan(String ... args) {
        int numSubTasks = args.length > 0 ? Integer.parseInt(args[0]) : 1;
        String edgeInput = args.length > 1 ? args[1] : "";
        String output = args.length > 2 ? args[2] : "";
        FileDataSource edges = new FileDataSource((FileInputFormat)new EdgeInFormat(), edgeInput, "BTC Edges");
        ReduceOperator buildTriads = ReduceOperator.builder((ReduceFunction)new BuildTriads(), StringValue.class, (int)0).name("Build Triads").build();
        JoinOperator closeTriads = JoinOperator.builder((JoinFunction)new CloseTriads(), StringValue.class, (int)1, (int)0).keyField(StringValue.class, 2, 1).name("Close Triads").build();
        closeTriads.setParameter("INPUT_LEFT_SHIP_STRATEGY", "SHIP_REPARTITION_HASH");
        closeTriads.setParameter("INPUT_RIGHT_SHIP_STRATEGY", "SHIP_REPARTITION_HASH");
        closeTriads.setParameter("LOCAL_STRATEGY", "LOCAL_STRATEGY_HASH_BUILD_SECOND");
        FileDataSink triangles = new FileDataSink((FileOutputFormat)new CsvOutputFormat(), output, "Output");
        ((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)CsvOutputFormat.configureRecordFormat((FileDataSink)triangles).recordDelimiter('\n')).fieldDelimiter(' ')).field(StringValue.class, 0)).field(StringValue.class, 1)).field(StringValue.class, 2);
        triangles.setInput((Operator)closeTriads);
        closeTriads.setSecondInput((Operator)edges);
        closeTriads.setFirstInput((Operator)buildTriads);
        buildTriads.setInput((Operator)edges);
        Plan plan = new Plan((GenericDataSinkBase)triangles, "Enumerate Triangles");
        plan.setDefaultParallelism(numSubTasks);
        return plan;
    }

    public String getDescription() {
        return "Parameters: [numSubStasks] [inputRDFTriples] [outputTriangles]";
    }

    @FunctionAnnotation.ConstantFieldsFirstExcept(value={})
    public static class CloseTriads
    extends JoinFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;

        public void join(Record triad, Record missingEdge, Collector<Record> out) throws Exception {
            out.collect((Object)triad);
        }
    }

    @FunctionAnnotation.ConstantFields(value={0})
    public static class BuildTriads
    extends ReduceFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final ArrayList<StringValue> otherVertices = new ArrayList(32);
        private final StringValue matchVertex = new StringValue();
        private final Record result = new Record();

        public BuildTriads() {
            this.otherVertices.add(new StringValue());
        }

        public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
            Record rec = records.next();
            rec.getFieldInto(0, (Value)this.matchVertex);
            rec.getFieldInto(1, (Value)this.otherVertices.get(0));
            this.result.setField(0, (Value)this.matchVertex);
            int numEdges = 1;
            while (records.hasNext()) {
                StringValue myVertex;
                Record next = records.next();
                if (numEdges >= this.otherVertices.size()) {
                    myVertex = new StringValue();
                    this.otherVertices.add(myVertex);
                } else {
                    myVertex = this.otherVertices.get(numEdges);
                }
                next.getFieldInto(1, (Value)myVertex);
                for (int i = 0; i < numEdges; ++i) {
                    StringValue otherVertex = this.otherVertices.get(i);
                    if (otherVertex.compareTo(myVertex) < 0) {
                        this.result.setField(1, (Value)otherVertex);
                        this.result.setField(2, (Value)myVertex);
                        out.collect((Object)this.result);
                        continue;
                    }
                    next.setField(2, (Value)otherVertex);
                    out.collect((Object)next);
                }
                ++numEdges;
            }
        }
    }

    public static class EdgeInFormat
    extends DelimitedInputFormat {
        private static final long serialVersionUID = 1L;
        private final StringValue rdfSubj = new StringValue();
        private final StringValue rdfPred = new StringValue();
        private final StringValue rdfObj = new StringValue();

        public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
            int limit = offset + numBytes;
            int startPos = offset;
            if ((startPos = this.parseVarLengthEncapsulatedStringField(bytes, startPos, limit, ' ', this.rdfSubj, '\"')) < 0) {
                return null;
            }
            if ((startPos = this.parseVarLengthEncapsulatedStringField(bytes, startPos, limit, ' ', this.rdfPred, '\"')) < 0 || !this.rdfPred.getValue().equals("<http://xmlns.com/foaf/0.1/knows>")) {
                return null;
            }
            if ((startPos = this.parseVarLengthEncapsulatedStringField(bytes, startPos, limit, ' ', this.rdfObj, '\"')) < 0) {
                return null;
            }
            if (this.rdfSubj.compareTo(this.rdfObj) <= 0) {
                target.setField(0, (Value)this.rdfSubj);
                target.setField(1, (Value)this.rdfObj);
            } else {
                target.setField(0, (Value)this.rdfObj);
                target.setField(1, (Value)this.rdfSubj);
            }
            return target;
        }

        private int parseVarLengthEncapsulatedStringField(byte[] bytes, int startPos, int limit, char delim, StringValue field, char encaps) {
            int i;
            boolean isEncaps = false;
            if (bytes[startPos] == encaps) {
                isEncaps = true;
            }
            if (isEncaps) {
                for (int i2 = startPos; i2 < limit; ++i2) {
                    if (bytes[i2] != encaps || bytes[i2 + 1] != delim) continue;
                    field.setValueAscii(bytes, startPos, i2 - startPos + 1);
                    return i2 + 2;
                }
                return -1;
            }
            for (i = startPos; i < limit; ++i) {
                if (bytes[i] != delim) continue;
                field.setValueAscii(bytes, startPos, i - startPos);
                return i + 1;
            }
            if (i == limit) {
                field.setValueAscii(bytes, startPos, i - startPos);
                return i + 1;
            }
            return -1;
        }
    }
}

