package org.apache.spark.sql.hudi.streaming;

import java.io.BufferedWriter;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.htrace.fasterxml.jackson.core.util.MinimalPrettyPrinter;
import org.apache.http.cookie.ClientCookie;
import org.apache.hudi.DataSourceReadOptions$;
import org.apache.hudi.HoodieSparkUtils$;
import org.apache.hudi.IncrementalRelation;
import org.apache.hudi.MergeOnReadIncrementalRelation;
import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.spark.org.apache.spark.sql.avro.SchemaConverters$;
import org.apache.spark.internal.Logging;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.encoders.RowEncoder$;
import org.apache.spark.sql.execution.streaming.HDFSMetadataLog;
import org.apache.spark.sql.execution.streaming.Offset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;

/* compiled from: HoodieStreamSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005ef\u0001B\u000e\u001d\u0001%B\u0001\"\u0012\u0001\u0003\u0002\u0003\u0006IA\u0012\u0005\t\u0015\u0002\u0011\t\u0011)A\u0005\u0017\"Aa\u000b\u0001B\u0001B\u0003%q\u000b\u0003\u0005a\u0001\t\u0005\t\u0015!\u0003b\u0011\u0015!\u0007\u0001\"\u0001f\u0011\u001da\u0007A1A\u0005\n5DaA\u001e\u0001!\u0002\u0013q\u0007\u0002C>\u0001\u0011\u000b\u0007I\u0011\u0002?\t\u0015\u0005\u001d\u0001\u0001#b\u0001\n\u0013\tI\u0001\u0003\u0006\u0002\u001e\u0001A)\u0019!C\u0005\u0003?A1\"!\f\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u00020!Y\u0011q\u0007\u0001A\u0002\u0003\u0007I\u0011BA\u001d\u0011-\t)\u0005\u0001a\u0001\u0002\u0003\u0006K!!\r\t\u0015\u0005%\u0003\u0001#b\u0001\n\u0013\ty\u0003C\u0004\u0002N\u0001!I!a\u0014\t\u000f\u0005m\u0003\u0001\"\u0011\u0002^!9\u0011q\f\u0001\u0005B\u0005\u0005\u0004bBA6\u0001\u0011\u0005\u0013Q\u000e\u0005\b\u0003+\u0003A\u0011BAL\u0011\u001d\ti\n\u0001C!\u0003?;q!!)\u001d\u0011\u0003\t\u0019K\u0002\u0004\u001c9!\u0005\u0011Q\u0015\u0005\u0007IZ!\t!!,\t\u0013\u0005=fC1A\u0005\u0002\u0005E\u0006\u0002CAZ-\u0001\u0006I!!\u0015\t\u0013\u0005Uf#!A\u0005\n\u0005]&A\u0005%p_\u0012LWm\u0015;sK\u0006l7k\\;sG\u0016T!!\b\u0010\u0002\u0013M$(/Z1nS:<'BA\u0010!\u0003\u0011AW\u000fZ5\u000b\u0005\u0005\u0012\u0013aA:rY*\u00111\u0005J\u0001\u0006gB\f'o\u001b\u0006\u0003K\u0019\na!\u00199bG\",'\"A\u0014\u0002\u0007=\u0014xm\u0001\u0001\u0014\u000b\u0001Q#'O \u0011\u0005-\u0002T\"\u0001\u0017\u000b\u00055r\u0013\u0001\u00027b]\u001eT\u0011aL\u0001\u0005U\u00064\u0018-\u0003\u00022Y\t1qJ\u00196fGR\u0004\"aM\u001c\u000e\u0003QR!!H\u001b\u000b\u0005Y\u0002\u0013!C3yK\u000e,H/[8o\u0013\tADG\u0001\u0004T_V\u00148-\u001a\t\u0003uuj\u0011a\u000f\u0006\u0003y\t\n\u0001\"\u001b8uKJt\u0017\r\\\u0005\u0003}m\u0012q\u0001T8hO&tw\r\u0005\u0002A\u00076\t\u0011IC\u0001C\u0003\u0015\u00198-\u00197b\u0013\t!\u0015I\u0001\u0007TKJL\u0017\r\\5{C\ndW-\u0001\u0006tc2\u001cuN\u001c;fqR\u0004\"a\u0012%\u000e\u0003\u0001J!!\u0013\u0011\u0003\u0015M\u000bFjQ8oi\u0016DH/\u0001\u0007nKR\fG-\u0019;b!\u0006$\b\u000e\u0005\u0002M':\u0011Q*\u0015\t\u0003\u001d\u0006k\u0011a\u0014\u0006\u0003!\"\na\u0001\u0010:p_Rt\u0014B\u0001*B\u0003\u0019\u0001&/\u001a3fM&\u0011A+\u0016\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005I\u000b\u0015\u0001D:dQ\u0016l\u0017m\u00149uS>t\u0007c\u0001!Y5&\u0011\u0011,\u0011\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u0005msV\"\u0001/\u000b\u0005u\u0003\u0013!\u0002;za\u0016\u001c\u0018BA0]\u0005)\u0019FO];diRK\b/Z\u0001\u000ba\u0006\u0014\u0018-\\3uKJ\u001c\b\u0003\u0002'c\u0017.K!aY+\u0003\u00075\u000b\u0007/\u0001\u0004=S:LGO\u0010\u000b\u0006M\"L'n\u001b\t\u0003O\u0002i\u0011\u0001\b\u0005\u0006\u000b\u0016\u0001\rA\u0012\u0005\u0006\u0015\u0016\u0001\ra\u0013\u0005\u0006-\u0016\u0001\ra\u0016\u0005\u0006A\u0016\u0001\r!Y\u0001\u000bQ\u0006$wn\u001c9D_:4W#\u00018\u0011\u0005=$X\"\u00019\u000b\u0005E\u0014\u0018\u0001B2p]\u001aT!a\u001d\u0013\u0002\r!\fGm\\8q\u0013\t)\bOA\u0007D_:4\u0017nZ;sCRLwN\\\u0001\fQ\u0006$wn\u001c9D_:4\u0007\u0005\u000b\u0002\bqB\u0011\u0001)_\u0005\u0003u\u0006\u0013\u0011\u0002\u001e:b]NLWM\u001c;\u0002\u0013Q\f'\r\\3QCRDW#A?\u0011\u0007y\f\u0019!D\u0001��\u0015\r\t\tA]\u0001\u0003MNL1!!\u0002��\u0005\u0011\u0001\u0016\r\u001e5\u0002\u00155,G/Y\"mS\u0016tG/\u0006\u0002\u0002\fA!\u0011QBA\r\u001b\t\tyA\u0003\u0003\u0002\u0012\u0005M\u0011!\u0002;bE2,'\u0002BA\u000b\u0003/\taaY8n[>t'BA\u0010%\u0013\u0011\tY\"a\u0004\u0003+!{w\u000eZ5f)\u0006\u0014G.Z'fi\u0006\u001cE.[3oi\u0006IA/\u00192mKRK\b/Z\u000b\u0003\u0003C\u0001B!a\t\u0002*5\u0011\u0011Q\u0005\u0006\u0005\u0003O\t\u0019\"A\u0003n_\u0012,G.\u0003\u0003\u0002,\u0005\u0015\"a\u0004%p_\u0012LW\rV1cY\u0016$\u0016\u0010]3\u0002\u00151\f7\u000f^(gMN,G/\u0006\u0002\u00022A\u0019q-a\r\n\u0007\u0005UBD\u0001\nI_>$\u0017.Z*pkJ\u001cWm\u00144gg\u0016$\u0018A\u00047bgR|eMZ:fi~#S-\u001d\u000b\u0005\u0003w\t\t\u0005E\u0002A\u0003{I1!a\u0010B\u0005\u0011)f.\u001b;\t\u0013\u0005\rC\"!AA\u0002\u0005E\u0012a\u0001=%c\u0005YA.Y:u\u001f\u001a47/\u001a;!Q\ti\u00010\u0001\bj]&$\u0018.\u00197PM\u001a\u001cX\r^:)\u00059A\u0018AC4fiZ+'o]5p]R!\u0011\u0011KA,!\r\u0001\u00151K\u0005\u0004\u0003+\n%aA%oi\"1\u0011\u0011L\bA\u0002-\u000b1B^3sg&|g\u000eT5oK\u000611o\u00195f[\u0006,\u0012AW\u0001\nO\u0016$xJ\u001a4tKR,\"!a\u0019\u0011\t\u0001C\u0016Q\r\t\u0004g\u0005\u001d\u0014bAA5i\t1qJ\u001a4tKR\f\u0001bZ3u\u0005\u0006$8\r\u001b\u000b\u0007\u0003_\ni)!%\u0011\t\u0005E\u0014q\u0011\b\u0005\u0003g\n\u0019I\u0004\u0003\u0002v\u0005\u0005e\u0002BA<\u0003\u007frA!!\u001f\u0002~9\u0019a*a\u001f\n\u0003\u001dJ!!\n\u0014\n\u0005\r\"\u0013BA\u0011#\u0013\r\t)\tI\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\tI)a#\u0003\u0013\u0011\u000bG/\u0019$sC6,'bAACA!9\u0011q\u0012\nA\u0002\u0005\r\u0014!B:uCJ$\bbBAJ%\u0001\u0007\u0011QM\u0001\u0004K:$\u0017aD:uCJ$8i\\7nSR$\u0016.\\3\u0015\u0007-\u000bI\nC\u0004\u0002\u001cN\u0001\r!!\r\u0002\u0017M$\u0018M\u001d;PM\u001a\u001cX\r^\u0001\u0005gR|\u0007\u000f\u0006\u0002\u0002<\u0005\u0011\u0002j\\8eS\u0016\u001cFO]3b[N{WO]2f!\t9gc\u0005\u0003\u0017\u0003O{\u0004c\u0001!\u0002*&\u0019\u00111V!\u0003\r\u0005s\u0017PU3g)\t\t\u0019+A\u0004W\u000bJ\u001b\u0016j\u0014(\u0016\u0005\u0005E\u0013\u0001\u0003,F%NKuJ\u0014\u0011\u0002\u0017I,\u0017\r\u001a*fg>dg/\u001a\u000b\u0002U\u0001")
/* loaded from: input_file:org/apache/spark/sql/hudi/streaming/HoodieStreamSource.class */
public class HoodieStreamSource implements Source, Logging, Serializable {
    private Path tablePath;
    private HoodieTableMetaClient metaClient;
    private HoodieTableType tableType;
    private transient HoodieSourceOffset initialOffsets;
    public final SQLContext org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$sqlContext;
    public final String org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$metadataPath;
    private final Option<StructType> schemaOption;
    private final Map<String, String> parameters;
    private final transient Configuration hadoopConf;
    private transient HoodieSourceOffset lastOffset;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile transient boolean bitmap$trans$0;
    private volatile byte bitmap$0;

    public static int VERSION() {
        return HoodieStreamSource$.MODULE$.VERSION();
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public void commit(Offset offset) {
        Source.commit$(this, offset);
    }

    public org.apache.spark.sql.connector.read.streaming.Offset initialOffset() {
        return Source.initialOffset$(this);
    }

    public org.apache.spark.sql.connector.read.streaming.Offset deserializeOffset(String str) {
        return Source.deserializeOffset$(this, str);
    }

    public void commit(org.apache.spark.sql.connector.read.streaming.Offset offset) {
        Source.commit$(this, offset);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private Configuration hadoopConf() {
        return this.hadoopConf;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.spark.sql.hudi.streaming.HoodieStreamSource] */
    private Path tablePath$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                Path path = new Path((String) this.parameters.getOrElse(ClientCookie.PATH_ATTR, () -> {
                    return "Missing 'path' option";
                }));
                this.tablePath = TablePathUtils.getTablePath(path.getFileSystem(hadoopConf()), path).get();
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.tablePath;
    }

    private Path tablePath() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? tablePath$lzycompute() : this.tablePath;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.spark.sql.hudi.streaming.HoodieStreamSource] */
    private HoodieTableMetaClient metaClient$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tablePath().toString()).build();
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.metaClient;
    }

    private HoodieTableMetaClient metaClient() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? metaClient$lzycompute() : this.metaClient;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.spark.sql.hudi.streaming.HoodieStreamSource] */
    private HoodieTableType tableType$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 4)) == 0) {
                this.tableType = metaClient().getTableType();
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 4);
            }
        }
        return this.tableType;
    }

    private HoodieTableType tableType() {
        return ((byte) (this.bitmap$0 & 4)) == 0 ? tableType$lzycompute() : this.tableType;
    }

    private HoodieSourceOffset lastOffset() {
        return this.lastOffset;
    }

    private void lastOffset_$eq(HoodieSourceOffset hoodieSourceOffset) {
        this.lastOffset = hoodieSourceOffset;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [org.apache.spark.sql.hudi.streaming.HoodieStreamSource] */
    private HoodieSourceOffset initialOffsets$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$trans$0) {
                HDFSMetadataLog<HoodieSourceOffset> hDFSMetadataLog = new HDFSMetadataLog<HoodieSourceOffset>(this) { // from class: org.apache.spark.sql.hudi.streaming.HoodieStreamSource$$anon$1
                    private final /* synthetic */ HoodieStreamSource $outer;

                    public void serialize(HoodieSourceOffset hoodieSourceOffset, OutputStream outputStream) {
                        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8));
                        bufferedWriter.write(new StringBuilder(2).append("v").append(HoodieStreamSource$.MODULE$.VERSION()).append("\n").toString());
                        bufferedWriter.write(hoodieSourceOffset.json());
                        bufferedWriter.flush();
                    }

                    /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
                    public HoodieSourceOffset m14195deserialize(InputStream inputStream) {
                        String readAsUTFString = FileIOUtils.readAsUTFString(inputStream);
                        int indexOf = readAsUTFString.indexOf("\n");
                        if (indexOf <= 0) {
                            throw new IllegalStateException("Bad metadata format, failed to find the version line.");
                        }
                        int org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$getVersion = this.$outer.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$getVersion(readAsUTFString.substring(0, indexOf));
                        if (org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$getVersion > HoodieStreamSource$.MODULE$.VERSION()) {
                            throw new IllegalStateException(new StringBuilder(63).append("UnSupportVersion: max support version is: ").append(HoodieStreamSource$.MODULE$.VERSION()).append(" current version is: ").append(org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$getVersion).toString());
                        }
                        return HoodieSourceOffset$.MODULE$.fromJson(readAsUTFString.substring(indexOf + 1));
                    }

                    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                    {
                        super(this.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$sqlContext.sparkSession(), this.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$metadataPath, ClassTag$.MODULE$.apply(HoodieSourceOffset.class));
                        if (this == null) {
                            throw null;
                        }
                        this.$outer = this;
                    }
                };
                this.initialOffsets = (HoodieSourceOffset) hDFSMetadataLog.get(0L).getOrElse(() -> {
                    hDFSMetadataLog.add(0L, HoodieSourceOffset$.MODULE$.INIT_OFFSET());
                    return HoodieSourceOffset$.MODULE$.INIT_OFFSET();
                });
                r0 = this;
                r0.bitmap$trans$0 = true;
            }
        }
        return this.initialOffsets;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public HoodieSourceOffset initialOffsets() {
        return !this.bitmap$trans$0 ? initialOffsets$lzycompute() : this.initialOffsets;
    }

    public int org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$getVersion(String str) {
        if (str.startsWith("v")) {
            return new StringOps(Predef$.MODULE$.augmentString(str.substring(1))).toInt();
        }
        throw new IllegalStateException(new StringBuilder(53).append("Illegal version line: ").append(str).append(MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR).append("in the streaming metadata path").toString());
    }

    public StructType schema() {
        return (StructType) this.schemaOption.getOrElse(() -> {
            return SchemaConverters$.MODULE$.toSqlType(new TableSchemaResolver(this.metaClient()).getTableAvroSchema()).dataType();
        });
    }

    public Option<Offset> getOffset() {
        metaClient().reloadActiveTimeline();
        HoodieTimeline filterCompletedInstants = metaClient().getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
        if (filterCompletedInstants.empty()) {
            lastOffset_$eq(initialOffsets());
        } else {
            String timestamp = filterCompletedInstants.lastInstant().get().getTimestamp();
            if (lastOffset() == null || new StringOps(Predef$.MODULE$.augmentString(timestamp)).$greater(lastOffset().commitTime())) {
                lastOffset_$eq(new HoodieSourceOffset(timestamp));
            }
        }
        return new Some(lastOffset());
    }

    public Dataset<Row> getBatch(Option<Offset> option, Offset offset) {
        RDD<Row> buildScan;
        initialOffsets();
        HoodieSourceOffset hoodieSourceOffset = (HoodieSourceOffset) option.map(offset2 -> {
            return HoodieSourceOffset$.MODULE$.apply(offset2);
        }).getOrElse(() -> {
            return this.initialOffsets();
        });
        HoodieSourceOffset apply = HoodieSourceOffset$.MODULE$.apply(offset);
        if (hoodieSourceOffset != null ? hoodieSourceOffset.equals(apply) : apply == null) {
            return this.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$sqlContext.internalCreateDataFrame(this.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$sqlContext.sparkContext().emptyRDD(ClassTag$.MODULE$.apply(InternalRow.class)).setName("empty"), schema(), true);
        }
        Map $plus$plus = this.parameters.$plus$plus(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME_OPT_KEY()), startCommitTime(hoodieSourceOffset)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceReadOptions$.MODULE$.END_INSTANTTIME_OPT_KEY()), apply.commitTime())})));
        HoodieTableType tableType = tableType();
        if (HoodieTableType.COPY_ON_WRITE.equals(tableType)) {
            SparkRowSerDe createRowSerDe = HoodieSparkUtils$.MODULE$.createRowSerDe(RowEncoder$.MODULE$.apply(schema()));
            buildScan = new IncrementalRelation(this.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$sqlContext, $plus$plus, schema(), metaClient()).buildScan().map(row -> {
                return createRowSerDe.serializeRow(row);
            }, ClassTag$.MODULE$.apply(InternalRow.class));
        } else {
            if (!HoodieTableType.MERGE_ON_READ.equals(tableType)) {
                throw new IllegalArgumentException(new StringBuilder(21).append("UnSupport tableType: ").append(tableType()).toString());
            }
            buildScan = new MergeOnReadIncrementalRelation(this.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$sqlContext, $plus$plus, schema(), metaClient()).buildScan((String[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(schema().fields())).map(structField -> {
                return structField.name();
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))), (Filter[]) Array$.MODULE$.empty(ClassTag$.MODULE$.apply(Filter.class)));
        }
        return this.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$sqlContext.internalCreateDataFrame(buildScan, schema(), true);
    }

    private String startCommitTime(HoodieSourceOffset hoodieSourceOffset) {
        String format;
        HoodieSourceOffset INIT_OFFSET = HoodieSourceOffset$.MODULE$.INIT_OFFSET();
        if (INIT_OFFSET != null ? INIT_OFFSET.equals(hoodieSourceOffset) : hoodieSourceOffset == null) {
            format = hoodieSourceOffset.commitTime();
        } else {
            if (hoodieSourceOffset == null) {
                throw new IllegalStateException("UnKnow offset type.");
            }
            format = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(HoodieActiveTimeline.COMMIT_FORMATTER.parse(hoodieSourceOffset.commitTime()).getTime() + 1000));
        }
        return format;
    }

    public void stop() {
    }

    public HoodieStreamSource(SQLContext sQLContext, String str, Option<StructType> option, Map<String, String> map) {
        this.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$sqlContext = sQLContext;
        this.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$metadataPath = str;
        this.schemaOption = option;
        this.parameters = map;
        Source.$init$(this);
        Logging.$init$(this);
        this.hadoopConf = sQLContext.sparkSession().sessionState().newHadoopConf();
    }
}
