package org.apache.beam.runners.spark.structuredstreaming.io;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.spark.InterruptibleIterator;
import org.apache.spark.Partition;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.checkerframework.dataflow.qual.Pure;
import scala.Option;
import scala.collection.Iterator;
import scala.collection.JavaConverters;
import scala.reflect.ClassTag;

/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.class */
public class BoundedDatasetFactory {

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory$BeamTable.class */
    private static class BeamTable<T> implements Table, SupportsRead {
        final BoundedSource<T> source;
        final Params<T> params;

        /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory$BeamTable$BeamBatch.class */
        private static class BeamBatch<T> implements Batch, Serializable {
            final BoundedSource<T> source;
            final Params<T> params;

            private BeamBatch(BoundedSource<T> boundedSource, Params<T> params) {
                this.source = boundedSource;
                this.params = params;
            }

            public InputPartition[] planInputPartitions() {
                return (InputPartition[]) SourcePartition.partitionsOf(this.source, this.params).toArray(new InputPartition[0]);
            }

            public PartitionReaderFactory createReaderFactory() {
                return inputPartition -> {
                    return new BeamPartitionReader((SourcePartition) inputPartition, this.params);
                };
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case -213025492:
                        if (implMethodName.equals("lambda$createReaderFactory$e0bef37e$1")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/sql/connector/read/PartitionReaderFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("createReader") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/spark/sql/connector/read/InputPartition;)Lorg/apache/spark/sql/connector/read/PartitionReader;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory$BeamTable$BeamBatch") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/spark/sql/connector/read/InputPartition;)Lorg/apache/spark/sql/connector/read/PartitionReader;")) {
                            BeamBatch beamBatch = (BeamBatch) serializedLambda.getCapturedArg(0);
                            return inputPartition -> {
                                return new BeamPartitionReader((SourcePartition) inputPartition, this.params);
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        }

        /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory$BeamTable$BeamPartitionReader.class */
        private static class BeamPartitionReader<T> implements PartitionReader<InternalRow> {
            final SourcePartitionIterator<T> iterator;
            final ExpressionEncoder.Serializer<WindowedValue<T>> serializer;

            @Nullable
            transient InternalRow next;

            BeamPartitionReader(SourcePartition<T> sourcePartition, Params<T> params) {
                this.iterator = new SourcePartitionIterator<>(sourcePartition, params);
                this.serializer = params.encoder.createSerializer();
            }

            public boolean next() throws IOException {
                if (!this.iterator.hasNext()) {
                    return false;
                }
                this.next = this.serializer.apply((WindowedValue) this.iterator.next());
                return true;
            }

            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public InternalRow m50get() {
                if (this.next == null) {
                    throw new IllegalStateException("Next not available");
                }
                return this.next;
            }

            public void close() throws IOException {
                this.next = null;
                this.iterator.close();
            }
        }

        BeamTable(BoundedSource<T> boundedSource, Params<T> params) {
            this.source = boundedSource;
            this.params = params;
        }

        public Encoder<WindowedValue<T>> getEncoder() {
            return this.params.encoder;
        }

        public ScanBuilder newScanBuilder(CaseInsensitiveStringMap caseInsensitiveStringMap) {
            return () -> {
                return new Scan() { // from class: org.apache.beam.runners.spark.structuredstreaming.io.BoundedDatasetFactory.BeamTable.1
                    public StructType readSchema() {
                        return BeamTable.this.params.encoder.schema();
                    }

                    public Batch toBatch() {
                        return new BeamBatch(BeamTable.this.source, BeamTable.this.params);
                    }
                };
            };
        }

        public String name() {
            return "BeamSource<" + this.source.getClass().getName() + ">";
        }

        public StructType schema() {
            return this.params.encoder.schema();
        }

        public Set<TableCapability> capabilities() {
            return ImmutableSet.of(TableCapability.BATCH_READ);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory$BoundedRDD.class */
    private static class BoundedRDD<T> extends RDD<WindowedValue<T>> {
        final BoundedSource<T> source;
        final Params<T> params;

        public BoundedRDD(SparkContext sparkContext, BoundedSource<T> boundedSource, Params<T> params) {
            super(sparkContext, ScalaInterop.emptyList(), ClassTag.apply(WindowedValue.class));
            this.source = boundedSource;
            this.params = params;
        }

        public Iterator<WindowedValue<T>> compute(Partition partition, TaskContext taskContext) {
            return new InterruptibleIterator(taskContext, JavaConverters.asScalaIterator(new SourcePartitionIterator((SourcePartition) partition, this.params)));
        }

        public Partition[] getPartitions() {
            return (Partition[]) SourcePartition.partitionsOf(this.source, this.params).toArray(new Partition[0]);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory$Params.class */
    public static class Params<T> implements Serializable {
        final Encoder<WindowedValue<T>> encoder;
        final Supplier<PipelineOptions> options;
        final int numPartitions;

        Params(Encoder<WindowedValue<T>> encoder, Supplier<PipelineOptions> supplier, int i) {
            Preconditions.checkArgument(i > 0, "Number of partitions must be greater than zero.");
            this.encoder = encoder;
            this.options = supplier;
            this.numPartitions = i;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory$SourcePartition.class */
    private static class SourcePartition<T> implements Partition, InputPartition {
        final BoundedSource<T> source;
        final int index;

        SourcePartition(BoundedSource<T> boundedSource, IntSupplier intSupplier) {
            this.source = boundedSource;
            this.index = intSupplier.getAsInt();
        }

        static <T> List<SourcePartition<T>> partitionsOf(BoundedSource<T> boundedSource, Params<T> params) {
            try {
                PipelineOptions pipelineOptions = params.options.get();
                List split = boundedSource.split(boundedSource.getEstimatedSizeBytes(pipelineOptions) / params.numPartitions, pipelineOptions);
                AtomicInteger atomicInteger = new AtomicInteger(0);
                IntSupplier intSupplier = atomicInteger::getAndIncrement;
                return (List) split.stream().map(boundedSource2 -> {
                    return new SourcePartition(boundedSource2, intSupplier);
                }).collect(Collectors.toList());
            } catch (Exception e) {
                throw new RuntimeException("Error splitting BoundedSource " + boundedSource.getClass().getCanonicalName(), e);
            }
        }

        public int index() {
            return this.index;
        }

        @Pure
        public int hashCode() {
            return this.index;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory$SourcePartitionIterator.class */
    private static class SourcePartitionIterator<T> extends AbstractIterator<WindowedValue<T>> implements Closeable {
        BoundedSource.BoundedReader<T> reader;
        boolean started = false;

        public SourcePartitionIterator(SourcePartition<T> sourcePartition, Params<T> params) {
            try {
                this.reader = sourcePartition.source.createReader(params.options.get());
            } catch (IOException e) {
                throw new RuntimeException("Failed to create reader from a BoundedSource.", e);
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.reader != null) {
                endOfData();
                try {
                    this.reader.close();
                } finally {
                    this.reader = null;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Code restructure failed: missing block: B:13:0x0018, code lost:
        
            if (start() != false) goto L9;
         */
        /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public org.apache.beam.sdk.util.WindowedValue<T> m51computeNext() {
            /*
                r5 = this;
                r0 = r5
                boolean r0 = r0.started     // Catch: java.io.IOException -> L39
                if (r0 == 0) goto L14
                r0 = r5
                org.apache.beam.sdk.io.BoundedSource$BoundedReader<T> r0 = r0.reader     // Catch: java.io.IOException -> L39
                boolean r0 = r0.advance()     // Catch: java.io.IOException -> L39
                if (r0 == 0) goto L2d
                goto L1b
            L14:
                r0 = r5
                boolean r0 = r0.start()     // Catch: java.io.IOException -> L39
                if (r0 == 0) goto L2d
            L1b:
                r0 = r5
                org.apache.beam.sdk.io.BoundedSource$BoundedReader<T> r0 = r0.reader     // Catch: java.io.IOException -> L39
                java.lang.Object r0 = r0.getCurrent()     // Catch: java.io.IOException -> L39
                r1 = r5
                org.apache.beam.sdk.io.BoundedSource$BoundedReader<T> r1 = r1.reader     // Catch: java.io.IOException -> L39
                org.joda.time.Instant r1 = r1.getCurrentTimestamp()     // Catch: java.io.IOException -> L39
                org.apache.beam.sdk.util.WindowedValue r0 = org.apache.beam.sdk.util.WindowedValue.timestampedValueInGlobalWindow(r0, r1)     // Catch: java.io.IOException -> L39
                return r0
            L2d:
                r0 = r5
                r0.close()     // Catch: java.io.IOException -> L39
                r0 = r5
                java.lang.Object r0 = r0.endOfData()     // Catch: java.io.IOException -> L39
                org.apache.beam.sdk.util.WindowedValue r0 = (org.apache.beam.sdk.util.WindowedValue) r0     // Catch: java.io.IOException -> L39
                return r0
            L39:
                r6 = move-exception
                java.lang.RuntimeException r0 = new java.lang.RuntimeException
                r1 = r0
                java.lang.String r2 = "Failed to start or advance reader."
                r3 = r6
                r1.<init>(r2, r3)
                throw r0
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.beam.runners.spark.structuredstreaming.io.BoundedDatasetFactory.SourcePartitionIterator.m51computeNext():org.apache.beam.sdk.util.WindowedValue");
        }

        private boolean start() throws IOException {
            this.started = true;
            return this.reader.start();
        }
    }

    private BoundedDatasetFactory() {
    }

    public static <T> Dataset<WindowedValue<T>> createDatasetFromRows(SparkSession sparkSession, BoundedSource<T> boundedSource, Supplier<PipelineOptions> supplier, Encoder<WindowedValue<T>> encoder) {
        return Dataset.ofRows(sparkSession, DataSourceV2Relation.create(new BeamTable(boundedSource, new Params(encoder, supplier, sparkSession.sparkContext().defaultParallelism())), Option.empty(), Option.empty())).as(encoder);
    }

    public static <T> Dataset<WindowedValue<T>> createDatasetFromRDD(SparkSession sparkSession, BoundedSource<T> boundedSource, Supplier<PipelineOptions> supplier, Encoder<WindowedValue<T>> encoder) {
        return sparkSession.createDataset(new BoundedRDD(sparkSession.sparkContext(), boundedSource, new Params(encoder, supplier, sparkSession.sparkContext().defaultParallelism())), encoder);
    }
}
