package com.samelamin.spark.bigquery.streaming;

import com.google.cloud.hadoop.io.bigquery.BigQueryStrings;
import com.samelamin.spark.bigquery.BigQueryClient$;
import com.samelamin.spark.bigquery.BigQuerySQLContext;
import com.samelamin.spark.bigquery.converters.SchemaConverters$;
import java.math.BigInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.execution.streaming.LongOffset;
import org.apache.spark.sql.execution.streaming.Offset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.types.StructType;
import org.joda.time.DateTime;
import org.joda.time.LocalDate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: BigQuerySource.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005Ec\u0001B\u0001\u0003\u00015\u0011aBQ5h#V,'/_*pkJ\u001cWM\u0003\u0002\u0004\t\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003\u000b\u0019\t\u0001BY5hcV,'/\u001f\u0006\u0003\u000f!\tQa\u001d9be.T!!\u0003\u0006\u0002\u0013M\fW.\u001a7b[&t'\"A\u0006\u0002\u0007\r|Wn\u0001\u0001\u0014\u0007\u0001qA\u0003\u0005\u0002\u0010%5\t\u0001CC\u0001\u0012\u0003\u0015\u00198-\u00197b\u0013\t\u0019\u0002C\u0001\u0004B]f\u0014VM\u001a\t\u0003+\u0001j\u0011A\u0006\u0006\u0003\u0007]Q!\u0001G\r\u0002\u0013\u0015DXmY;uS>t'B\u0001\u000e\u001c\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003\u000fqQ!!\b\u0010\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005y\u0012aA8sO&\u0011\u0011E\u0006\u0002\u0007'>,(oY3\t\u0011\r\u0002!\u0011!Q\u0001\n\u0011\n!b]9m\u0007>tG/\u001a=u!\t)c%D\u0001\u001a\u0013\t9\u0013D\u0001\u0006T#2\u001buN\u001c;fqRD\u0001\"\u000b\u0001\u0003\u0002\u0003\u0006IAK\u0001\fkN,'oX:dQ\u0016l\u0017\rE\u0002\u0010W5J!\u0001\f\t\u0003\r=\u0003H/[8o!\tq\u0013'D\u00010\u0015\t\u0001\u0014$A\u0003usB,7/\u0003\u00023_\tQ1\u000b\u001e:vGR$\u0016\u0010]3\t\u0011Q\u0002!\u0011!Q\u0001\nU\nqa\u001c9uS>t7\u000f\u0005\u00037sqbdBA\b8\u0013\tA\u0004#\u0001\u0004Qe\u0016$WMZ\u0005\u0003um\u00121!T1q\u0015\tA\u0004\u0003\u0005\u00027{%\u0011ah\u000f\u0002\u0007'R\u0014\u0018N\\4\t\u000b\u0001\u0003A\u0011A!\u0002\rqJg.\u001b;?)\u0011\u0011E)\u0012$\u0011\u0005\r\u0003Q\"\u0001\u0002\t\u000b\rz\u0004\u0019\u0001\u0013\t\u000b%z\u0004\u0019\u0001\u0016\t\u000bQz\u0004\u0019A\u001b\t\u000f!\u0003!\u0019!C\u0001\u0013\u0006\u0019\u0002.\u00193p_B\u001cuN\u001c4jOV\u0014\u0018\r^5p]V\t!\n\u0005\u0002L!6\tAJ\u0003\u0002N\u001d\u0006!1m\u001c8g\u0015\tyE$\u0001\u0004iC\u0012|w\u000e]\u0005\u0003#2\u0013QbQ8oM&<WO]1uS>t\u0007BB*\u0001A\u0003%!*\u0001\u000biC\u0012|w\u000e]\"p]\u001aLw-\u001e:bi&|g\u000e\t\u0005\b+\u0002\u0011\r\u0011\"\u0003W\u0003\u0019awnZ4feV\tq\u000b\u0005\u0002Y76\t\u0011L\u0003\u0002[=\u0005)1\u000f\u001c45U&\u0011A,\u0017\u0002\u0007\u0019><w-\u001a:\t\ry\u0003\u0001\u0015!\u0003X\u0003\u001dawnZ4fe\u0002Bq\u0001\u0019\u0001C\u0002\u0013\u0005\u0011-A\u000egk2d\u00170U;bY&4\u0017.\u001a3PkR\u0004X\u000f\u001e+bE2,\u0017\nZ\u000b\u0002y!11\r\u0001Q\u0001\nq\nADZ;mYf\fV/\u00197jM&,GmT;uaV$H+\u00192mK&#\u0007\u0005C\u0004f\u0001\t\u0007I\u0011\u00014\u0002\u001fQLW.Z:uC6\u00048i\u001c7v[:,\u0012a\u001a\t\u0003Q6l\u0011!\u001b\u0006\u0003U.\fA\u0001\\1oO*\tA.\u0001\u0003kCZ\f\u0017B\u0001 j\u0011\u0019y\u0007\u0001)A\u0005O\u0006\u0001B/[7fgR\fW\u000e]\"pYVlg\u000e\t\u0005\u0006c\u0002!\tE]\u0001\u0007g\u000eDW-\\1\u0016\u00035BQ\u0001\u001e\u0001\u0005BU\f\u0011bZ3u\u001f\u001a47/\u001a;\u0016\u0003Y\u00042aD\u0016x!\t)\u00020\u0003\u0002z-\t1qJ\u001a4tKRDQa\u001f\u0001\u0005Bq\f\u0001bZ3u\u0005\u0006$8\r\u001b\u000b\u0006{\u0006}\u00111\u0005\t\u0004}\u0006eabA@\u0002\u00169!\u0011\u0011AA\n\u001d\u0011\t\u0019!!\u0005\u000f\t\u0005\u0015\u0011q\u0002\b\u0005\u0003\u000f\ti!\u0004\u0002\u0002\n)\u0019\u00111\u0002\u0007\u0002\rq\u0012xn\u001c;?\u0013\u0005y\u0012BA\u000f\u001f\u0013\t9A$\u0003\u0002\u001b7%\u0019\u0011qC\r\u0002\u000fA\f7m[1hK&!\u00111DA\u000f\u0005%!\u0015\r^1Ge\u0006lWMC\u0002\u0002\u0018eAa!!\t{\u0001\u00041\u0018!B:uCJ$\bBBA\u0013u\u0002\u0007q/A\u0002f]\u0012Dq!!\u000b\u0001\t\u0003\nY#\u0001\u0003ti>\u0004HCAA\u0017!\ry\u0011qF\u0005\u0004\u0003c\u0001\"\u0001B+oSRDq!!\u000e\u0001\t\u0003\t9$\u0001\nhKR\u001cuN\u001c<feR,GmU2iK6\fGcA\u0017\u0002:!11%a\rA\u0002\u0011:q!!\u0010\u0003\u0011\u0003\ty$\u0001\bCS\u001e\fV/\u001a:z'>,(oY3\u0011\u0007\r\u000b\tE\u0002\u0004\u0002\u0005!\u0005\u00111I\n\u0004\u0003\u0003r\u0001b\u0002!\u0002B\u0011\u0005\u0011q\t\u000b\u0003\u0003\u007fA\u0011\"a\u0013\u0002B\t\u0007I\u0011\u0001:\u0002\u001d\u0011+e)Q+M)~\u001b6\tS#N\u0003\"A\u0011qJA!A\u0003%Q&A\bE\u000b\u001a\u000bU\u000b\u0014+`'\u000eCU)T!!\u0001")
/* loaded from: input_file:com/samelamin/spark/bigquery/streaming/BigQuerySource.class */
public class BigQuerySource implements Source {
    private final SQLContext sqlContext;
    private final Configuration hadoopConfiguration;
    private final Logger logger;
    private final String fullyQualifiedOutputTableId;
    private final String timestampColumn;

    public static StructType DEFAULT_SCHEMA() {
        return BigQuerySource$.MODULE$.DEFAULT_SCHEMA();
    }

    public void commit(Offset offset) {
        Source.class.commit(this, offset);
    }

    public Configuration hadoopConfiguration() {
        return this.hadoopConfiguration;
    }

    private Logger logger() {
        return this.logger;
    }

    public String fullyQualifiedOutputTableId() {
        return this.fullyQualifiedOutputTableId;
    }

    public String timestampColumn() {
        return this.timestampColumn;
    }

    public StructType schema() {
        return BigQuerySource$.MODULE$.DEFAULT_SCHEMA();
    }

    public Option<Offset> getOffset() {
        BigInteger bigInteger = (BigInteger) new BigQuerySQLContext(this.sqlContext).getLatestBQModifiedTime(fullyQualifiedOutputTableId()).getOrElse(new BigQuerySource$$anonfun$1(this));
        logger().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", " was last updated on ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{fullyQualifiedOutputTableId(), BoxesRunTime.boxToLong(bigInteger.longValue())})));
        return new Some(new LongOffset(bigInteger.longValue()));
    }

    public Dataset<Row> getBatch(Option<Offset> option, Offset offset) {
        long offset2 = ((LongOffset) option.getOrElse(new BigQuerySource$$anonfun$2(this))).offset();
        long offset3 = ((LongOffset) offset).offset();
        LocalDate localDate = new DateTime(offset2).toLocalDate();
        String localDate2 = new DateTime(offset3).toLocalDate().toString();
        logger().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Fetching data between ", " and ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(offset2), BoxesRunTime.boxToLong(offset3)})));
        return new BigQuerySQLContext(this.sqlContext).bigQuerySelect(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |SELECT\n         |  *\n         |FROM\n         |  `", "`\n         |WHERE\n         |  ", " BETWEEN TIMESTAMP_MILLIS(", ") AND TIMESTAMP_MILLIS(", ")\n         |  AND _PARTITIONTIME BETWEEN TIMESTAMP('", "') AND TIMESTAMP('", "')\n         |  "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{fullyQualifiedOutputTableId().replace(':', '.'), timestampColumn(), BoxesRunTime.boxToLong(offset2), BoxesRunTime.boxToLong(offset3), localDate, localDate2})))).stripMargin());
    }

    public void stop() {
    }

    public StructType getConvertedSchema(SQLContext sQLContext) {
        return SchemaConverters$.MODULE$.BQToSQLSchema(BigQueryClient$.MODULE$.getInstance(sQLContext).getTableSchema(BigQueryStrings.parseTableReference(fullyQualifiedOutputTableId())));
    }

    public BigQuerySource(SQLContext sQLContext, Option<StructType> option, Map<String, String> map) {
        this.sqlContext = sQLContext;
        Source.class.$init$(this);
        this.hadoopConfiguration = sQLContext.sparkContext().hadoopConfiguration();
        this.logger = LoggerFactory.getLogger(BigQuerySource.class);
        this.fullyQualifiedOutputTableId = (String) map.get("tableReferenceSource").get();
        this.timestampColumn = hadoopConfiguration().get("timestamp_column", "bq_load_timestamp");
    }
}
