/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.lang.reflect.Type;
import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.TableFunctionScan;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelColumnMapping;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexInputRef;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
import org.joda.time.Duration;

public class BeamTableFunctionScanRel
extends TableFunctionScan
implements BeamRelNode {
    public BeamTableFunctionScanRel(RelOptCluster cluster, RelTraitSet traitSet, List<RelNode> inputs, RexNode rexCall, Type elementType, RelDataType rowType, Set<RelColumnMapping> columnMappings) {
        super(cluster, traitSet, inputs, rexCall, elementType, rowType, columnMappings);
    }

    public TableFunctionScan copy(RelTraitSet traitSet, List<RelNode> list, RexNode rexNode, Type type, RelDataType relDataType, Set<RelColumnMapping> set) {
        return new BeamTableFunctionScanRel(this.getCluster(), traitSet, list, rexNode, type, relDataType, set);
    }

    @Override
    public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
        return new Transform();
    }

    private Duration durationParameter(RexNode node) {
        return Duration.millis((long)this.longValue(node));
    }

    private long longValue(RexNode operand) {
        if (operand instanceof RexLiteral) {
            return ((Number)((Object)RexLiteral.value((RexNode)operand))).longValue();
        }
        throw new IllegalArgumentException(String.format("[%s] is not valid.", operand));
    }

    @Override
    public NodeStats estimateNodeStats(RelMetadataQuery mq) {
        return BeamSqlRelUtils.getNodeStats(this.getInput(0), mq);
    }

    @Override
    public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
        NodeStats inputEstimates = BeamSqlRelUtils.getNodeStats(this.getInput(0), mq);
        double rowSize = this.getRowType().getFieldCount();
        double cpu = inputEstimates.getRowCount() * rowSize;
        double cpuRate = inputEstimates.getRate() * inputEstimates.getWindow() * rowSize;
        return BeamCostModel.FACTORY.makeCost(cpu, cpuRate);
    }

    private static class FixedWindowDoFn
    extends DoFn<Row, Row> {
        private final int windowFieldIndex;
        private final FixedWindows windowFn;
        private final Schema outputSchema;

        public FixedWindowDoFn(FixedWindows windowFn, int windowFieldIndex, Schema schema) {
            this.windowFn = windowFn;
            this.windowFieldIndex = windowFieldIndex;
            this.outputSchema = schema;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            Row row = (Row)c.element();
            IntervalWindow window = this.windowFn.assignWindow(row.getDateTime(this.windowFieldIndex).toInstant());
            Row.Builder builder = Row.withSchema((Schema)this.outputSchema);
            builder.addValues(row.getValues());
            builder.addValue((Object)window.start());
            builder.addValue((Object)window.end());
            c.output((Object)builder.build());
        }
    }

    private class Transform
    extends PTransform<PCollectionList<Row>, PCollection<Row>> {
        private Transform() {
        }

        public PCollection<Row> expand(PCollectionList<Row> input) {
            Preconditions.checkArgument((input.size() == 1 ? 1 : 0) != 0, (String)"Wrong number of inputs for %s, expected 1 input but received: %s", (Object[])new Object[]{BeamTableFunctionScanRel.class.getSimpleName(), input});
            String operatorName = ((RexCall)BeamTableFunctionScanRel.this.getCall()).getOperator().getName();
            Preconditions.checkArgument((boolean)operatorName.equals("TUMBLE"), (String)"Only support TUMBLE table-valued function. Current operator: %s", (Object[])new Object[]{operatorName});
            RexCall call = (RexCall)BeamTableFunctionScanRel.this.getCall();
            RexInputRef wmCol = (RexInputRef)call.getOperands().get(1);
            PCollection upstream = input.get(0);
            Schema outputSchema = CalciteUtils.toSchema(BeamTableFunctionScanRel.this.getRowType());
            return ((PCollection)upstream.apply((PTransform)ParDo.of((DoFn)new FixedWindowDoFn(FixedWindows.of((Duration)BeamTableFunctionScanRel.this.durationParameter((RexNode)call.getOperands().get(2))), wmCol.getIndex(), outputSchema)))).setRowSchema(outputSchema);
        }
    }
}

