/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recordJobs.relational;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.Iterator;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.Program;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.api.java.record.functions.MapFunction;
import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.JoinOperator;
import org.apache.flink.api.java.record.operators.MapOperator;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.recordJobs.util.IntTupleDataInFormat;
import org.apache.flink.test.recordJobs.util.StringTupleDataOutFormat;
import org.apache.flink.test.recordJobs.util.Tuple;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TPCHQuery4
implements Program,
ProgramDescription {
    private static Logger LOG = LoggerFactory.getLogger(TPCHQuery4.class);
    private int parallelism = 1;
    private String ordersInputPath;
    private String lineItemInputPath;
    private String outputPath;

    public Plan getPlan(String ... args) throws IllegalArgumentException {
        if (args == null || args.length != 4) {
            LOG.warn("number of arguments do not match!");
            this.ordersInputPath = "";
            this.lineItemInputPath = "";
            this.outputPath = "";
        } else {
            this.setArgs(args);
        }
        FileDataSource orders = new FileDataSource((FileInputFormat)new IntTupleDataInFormat(), this.ordersInputPath, "Orders");
        orders.setParallelism(this.parallelism);
        FileDataSource lineItems = new FileDataSource((FileInputFormat)new IntTupleDataInFormat(), this.lineItemInputPath, "LineItems");
        lineItems.setParallelism(this.parallelism);
        FileDataSink result = new FileDataSink((FileOutputFormat)new StringTupleDataOutFormat(), this.outputPath, "Output");
        result.setParallelism(this.parallelism);
        MapOperator lineFilter = MapOperator.builder(LiFilter.class).name("LineItemFilter").build();
        lineFilter.setParallelism(this.parallelism);
        MapOperator ordersFilter = MapOperator.builder(OFilter.class).name("OrdersFilter").build();
        ordersFilter.setParallelism(this.parallelism);
        JoinOperator join = JoinOperator.builder(JoinLiO.class, IntValue.class, (int)0, (int)0).name("OrdersLineitemsJoin").build();
        join.setParallelism(this.parallelism);
        ReduceOperator aggregation = ReduceOperator.builder(CountAgg.class, StringValue.class, (int)0).name("AggregateGroupBy").build();
        aggregation.setParallelism(this.parallelism);
        lineFilter.setInput((Operator)lineItems);
        ordersFilter.setInput((Operator)orders);
        join.setFirstInput((Operator)ordersFilter);
        join.setSecondInput((Operator)lineFilter);
        aggregation.setInput((Operator)join);
        result.setInput((Operator)aggregation);
        return new Plan((GenericDataSinkBase)result, "TPC-H 4");
    }

    private void setArgs(String[] args) {
        this.parallelism = Integer.parseInt(args[0]);
        this.ordersInputPath = args[1];
        this.lineItemInputPath = args[2];
        this.outputPath = args[3];
    }

    public String getDescription() {
        return "Parameters: [parallelism] [orders-input] [lineitem-input] [output]";
    }

    public static class CountAgg
    extends ReduceFunction {
        public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
            long count = 0L;
            Record rec = null;
            while (records.hasNext()) {
                rec = records.next();
                ++count;
            }
            if (rec != null) {
                Tuple tuple = new Tuple();
                tuple.addAttribute("" + count);
                rec.setField(1, (Value)tuple);
            }
            out.collect((Object)rec);
        }
    }

    public static class JoinLiO
    extends JoinFunction {
        public void join(Record order, Record line, Collector<Record> out) throws Exception {
            Tuple orderTuple = (Tuple)order.getField(1, Tuple.class);
            orderTuple.project(32);
            String newOrderKey = orderTuple.getStringValueAt(0);
            order.setField(0, (Value)new StringValue((CharSequence)newOrderKey));
            out.collect((Object)order);
        }
    }

    public static class LiFilter
    extends MapFunction {
        private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");

        public void map(Record record, Collector<Record> out) throws Exception {
            Date receiptDate;
            Date commitDate;
            Tuple tuple = (Tuple)record.getField(1, Tuple.class);
            String commitString = tuple.getStringValueAt(11);
            String receiptString = tuple.getStringValueAt(12);
            try {
                commitDate = this.sdf.parse(commitString);
                receiptDate = this.sdf.parse(receiptString);
            }
            catch (ParseException e) {
                throw new RuntimeException(e);
            }
            if (commitDate.before(receiptDate)) {
                out.collect((Object)record);
            }
        }
    }

    public static class OFilter
    extends MapFunction {
        private final String dateParamString = "1995-01-01";
        private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        private final GregorianCalendar gregCal = new GregorianCalendar();
        private Date paramDate;
        private Date plusThreeMonths;

        public void open(Configuration parameters) {
            try {
                this.paramDate = this.sdf.parse(this.dateParamString);
                this.plusThreeMonths = this.getPlusThreeMonths(this.paramDate);
            }
            catch (ParseException e) {
                throw new RuntimeException(e);
            }
        }

        public void map(Record record, Collector<Record> out) throws Exception {
            Date orderDate;
            Tuple tuple = (Tuple)record.getField(1, Tuple.class);
            String orderStringDate = tuple.getStringValueAt(4);
            try {
                orderDate = this.sdf.parse(orderStringDate);
            }
            catch (ParseException e) {
                throw new RuntimeException(e);
            }
            if (this.paramDate.before(orderDate) && this.plusThreeMonths.after(orderDate)) {
                out.collect((Object)record);
            }
        }

        private Date getPlusThreeMonths(Date paramDate) {
            this.gregCal.setTime(paramDate);
            this.gregCal.add(2, 3);
            Date plusThreeMonths = this.gregCal.getTime();
            return plusThreeMonths;
        }
    }
}

