package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.extensions.sql.impl.TVFSlidingWindowFn;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.impl.utils.TVFStreamingUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.WithTimestamps;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.TableFunctionScan;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelColumnMapping;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexInputRef;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.class */
public class BeamTableFunctionScanRel extends TableFunctionScan implements BeamRelNode {

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel$FixedWindowDoFn.class */
    private static class FixedWindowDoFn extends DoFn<Row, Row> {
        private final int windowFieldIndex;
        private final FixedWindows windowFn;
        private final Schema outputSchema;

        public FixedWindowDoFn(FixedWindows fixedWindows, int i, Schema schema) {
            this.windowFn = fixedWindows;
            this.windowFieldIndex = i;
            this.outputSchema = schema;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Row, Row>.ProcessContext processContext) {
            Row row = (Row) processContext.element();
            IntervalWindow assignWindow = this.windowFn.assignWindow(row.getDateTime(this.windowFieldIndex).toInstant());
            Row.Builder withSchema = Row.withSchema(this.outputSchema);
            withSchema.addValues(row.getValues());
            withSchema.addValue(assignWindow.start());
            withSchema.addValue(assignWindow.end());
            processContext.output(withSchema.build());
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel$SessionKeyDoFn.class */
    private static class SessionKeyDoFn extends DoFn<Row, KV<Row, Row>> {
        private final Schema keySchema;
        private final List<Integer> keyIndex;

        public SessionKeyDoFn(Schema schema, List<Integer> list) {
            this.keySchema = schema;
            this.keyIndex = list;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Row, KV<Row, Row>>.ProcessContext processContext) {
            Row row = (Row) processContext.element();
            Row.Builder withSchema = Row.withSchema(this.keySchema);
            Iterator<Integer> it = this.keyIndex.iterator();
            while (it.hasNext()) {
                withSchema.addValue(row.getValue(it.next().intValue()));
            }
            processContext.output(KV.of(withSchema.build(), row));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel$SessionWindowDoFn.class */
    private static class SessionWindowDoFn extends DoFn<KV<Row, Iterable<Row>>, Row> {
        private final Schema outputSchema;

        public SessionWindowDoFn(Schema schema) {
            this.outputSchema = schema;
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<Row, Iterable<Row>> kv, BoundedWindow boundedWindow, DoFn.OutputReceiver<Row> outputReceiver) {
            IntervalWindow intervalWindow = (IntervalWindow) boundedWindow;
            Iterator it = ((Iterable) kv.getValue()).iterator();
            while (it.hasNext()) {
                outputReceiver.output(Row.withSchema(this.outputSchema).addValues(((Row) it.next()).getValues()).addValue(intervalWindow.start()).addValue(intervalWindow.end()).build());
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel$SlidingWindowDoFn.class */
    private static class SlidingWindowDoFn extends DoFn<Row, Row> {
        private final int windowFieldIndex;
        private final SlidingWindows windowFn;
        private final Schema outputSchema;

        public SlidingWindowDoFn(SlidingWindows slidingWindows, int i, Schema schema) {
            this.windowFn = slidingWindows;
            this.windowFieldIndex = i;
            this.outputSchema = schema;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Row, Row>.ProcessContext processContext) {
            Row row = (Row) processContext.element();
            for (IntervalWindow intervalWindow : this.windowFn.assignWindows(row.getDateTime(this.windowFieldIndex).toInstant())) {
                Row.Builder withSchema = Row.withSchema(this.outputSchema);
                withSchema.addValues(row.getValues());
                withSchema.addValue(intervalWindow.start());
                withSchema.addValue(intervalWindow.end());
                processContext.output(withSchema.build());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel$TVFToPTransform.class */
    public interface TVFToPTransform {
        PCollection<Row> toPTransform(RexCall rexCall, PCollection<Row> pCollection);
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel$Transform.class */
    private class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
        private TVFToPTransform tumbleToPTransform;
        private TVFToPTransform hopToPTransform;
        private TVFToPTransform sessionToPTransform;
        private final ImmutableMap<String, TVFToPTransform> tvfToPTransformMap;

        private Transform() {
            this.tumbleToPTransform = (rexCall, pCollection) -> {
                RexInputRef rexInputRef = (RexInputRef) rexCall.getOperands().get(1);
                Schema schema = CalciteUtils.toSchema(BeamTableFunctionScanRel.this.getRowType());
                FixedWindows of = FixedWindows.of(BeamTableFunctionScanRel.this.durationParameter((RexNode) rexCall.getOperands().get(2)));
                return assignTimestampsAndWindow(pCollection.apply(ParDo.of(new FixedWindowDoFn(of, rexInputRef.getIndex(), schema))).setRowSchema(schema), rexInputRef.getIndex(), of);
            };
            this.hopToPTransform = (rexCall2, pCollection2) -> {
                RexInputRef rexInputRef = (RexInputRef) rexCall2.getOperands().get(1);
                Schema schema = CalciteUtils.toSchema(BeamTableFunctionScanRel.this.getRowType());
                Duration durationParameter = BeamTableFunctionScanRel.this.durationParameter((RexNode) rexCall2.getOperands().get(2));
                Duration durationParameter2 = BeamTableFunctionScanRel.this.durationParameter((RexNode) rexCall2.getOperands().get(3));
                return assignTimestampsAndWindow(pCollection2.apply(ParDo.of(new SlidingWindowDoFn(SlidingWindows.of(durationParameter2).every(durationParameter), rexInputRef.getIndex(), schema))).setRowSchema(schema), rexInputRef.getIndex(), TVFSlidingWindowFn.of(durationParameter2, durationParameter));
            };
            this.sessionToPTransform = (rexCall3, pCollection3) -> {
                RexInputRef rexInputRef = (RexInputRef) rexCall3.getOperands().get(1);
                Duration durationParameter = BeamTableFunctionScanRel.this.durationParameter((RexNode) rexCall3.getOperands().get(2));
                ArrayList arrayList = new ArrayList();
                Iterator it = rexCall3.getOperands().subList(3, rexCall3.getOperands().size()).iterator();
                while (it.hasNext()) {
                    arrayList.add(Integer.valueOf(((RexNode) it.next()).getIndex()));
                }
                PCollection<Row> assignTimestampsAndWindow = assignTimestampsAndWindow(pCollection3, rexInputRef.getIndex(), Sessions.withGapDuration(durationParameter));
                Schema keySchema = getKeySchema(pCollection3.getSchema(), arrayList);
                Schema schema = CalciteUtils.toSchema(BeamTableFunctionScanRel.this.getRowType());
                return assignTimestampsAndWindow.apply("assign_session_key", ParDo.of(new SessionKeyDoFn(keySchema, arrayList))).setCoder(KvCoder.of(RowCoder.of(keySchema), pCollection3.getCoder())).apply(GroupByKey.create()).apply(ParDo.of(new SessionWindowDoFn(schema))).setRowSchema(schema).apply("reWindowIntoGlobalWindow", Window.into(new GlobalWindows()));
            };
            this.tvfToPTransformMap = ImmutableMap.of(TVFStreamingUtils.FIXED_WINDOW_TVF, this.tumbleToPTransform, TVFStreamingUtils.SLIDING_WINDOW_TVF, this.hopToPTransform, TVFStreamingUtils.SESSION_WINDOW_TVF, this.sessionToPTransform);
        }

        public PCollection<Row> expand(PCollectionList<Row> pCollectionList) {
            Preconditions.checkArgument(pCollectionList.size() == 1, "Wrong number of inputs for %s, expected 1 input but received: %s", new Object[]{BeamTableFunctionScanRel.class.getSimpleName(), pCollectionList});
            String name = BeamTableFunctionScanRel.this.getCall().getOperator().getName();
            Preconditions.checkArgument(this.tvfToPTransformMap.keySet().contains(name), "Only support %s table-valued functions. Current operator: %s", new Object[]{this.tvfToPTransformMap.keySet(), name});
            return ((TVFToPTransform) this.tvfToPTransformMap.get(name)).toPTransform((RexCall) BeamTableFunctionScanRel.this.getCall(), pCollectionList.get(0));
        }

        private Schema getKeySchema(Schema schema, List<Integer> list) {
            ArrayList arrayList = new ArrayList();
            Iterator<Integer> it = list.iterator();
            while (it.hasNext()) {
                arrayList.add(schema.getField(it.next().intValue()));
            }
            return Schema.builder().addFields(arrayList).build();
        }

        private PCollection<Row> assignTimestampsAndWindow(PCollection<Row> pCollection, int i, WindowFn<Row, IntervalWindow> windowFn) {
            return pCollection.apply("assignEventTimestamp", WithTimestamps.of(row -> {
                return row.getDateTime(i).toInstant();
            }).withAllowedTimestampSkew(new Duration(Long.MAX_VALUE))).setCoder(pCollection.getCoder()).apply(Window.into(windowFn));
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1453908472:
                    if (implMethodName.equals("lambda$assignTimestampsAndWindow$79a16900$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel$Transform") && serializedLambda.getImplMethodSignature().equals("(ILorg/apache/beam/sdk/values/Row;)Lorg/joda/time/Instant;")) {
                        int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                        return row -> {
                            return row.getDateTime(intValue).toInstant();
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    public BeamTableFunctionScanRel(RelOptCluster relOptCluster, RelTraitSet relTraitSet, List<RelNode> list, RexNode rexNode, Type type, RelDataType relDataType, Set<RelColumnMapping> set) {
        super(relOptCluster, relTraitSet, list, rexNode, type, relDataType, set);
    }

    public TableFunctionScan copy(RelTraitSet relTraitSet, List<RelNode> list, RexNode rexNode, Type type, RelDataType relDataType, Set<RelColumnMapping> set) {
        return new BeamTableFunctionScanRel(getCluster(), relTraitSet, list, rexNode, type, relDataType, set);
    }

    @Override // org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode
    public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
        return new Transform();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Duration durationParameter(RexNode rexNode) {
        return Duration.millis(longValue(rexNode));
    }

    private long longValue(RexNode rexNode) {
        if (rexNode instanceof RexLiteral) {
            return ((Number) RexLiteral.value(rexNode)).longValue();
        }
        throw new IllegalArgumentException(String.format("[%s] is not valid.", rexNode));
    }

    @Override // org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode
    public NodeStats estimateNodeStats(RelMetadataQuery relMetadataQuery) {
        return BeamSqlRelUtils.getNodeStats(getInput(0), relMetadataQuery);
    }

    @Override // org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode
    public BeamCostModel beamComputeSelfCost(RelOptPlanner relOptPlanner, RelMetadataQuery relMetadataQuery) {
        NodeStats nodeStats = BeamSqlRelUtils.getNodeStats(getInput(0), relMetadataQuery);
        double fieldCount = getRowType().getFieldCount();
        return BeamCostModel.FACTORY.makeCost(nodeStats.getRowCount() * fieldCount, nodeStats.getRate() * nodeStats.getWindow() * fieldCount);
    }
}
