package org.apache.spark.carbondata;

import java.net.BindException;
import java.net.ServerSocket;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.spark.sql.CarbonEnv$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.streaming.ProcessingTime$;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.test.util.QueryTest;
import org.scalatest.Args;
import org.scalatest.BeforeAndAfterAll;
import org.scalatest.ConfigMap;
import org.scalatest.FunSuiteLike;
import org.scalatest.Status;
import org.scalatest.Tag;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.StringContext;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.collection.mutable.WrappedArray;
import scala.reflect.ScalaSignature;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxesRunTime;

/* compiled from: TestStreamingTableWithRowParser.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005]f\u0001B\u0001\u0003\u0001-\u0011q\u0004V3tiN#(/Z1nS:<G+\u00192mK^KG\u000f\u001b*poB\u000b'o]3s\u0015\t\u0019A!\u0001\u0006dCJ\u0014wN\u001c3bi\u0006T!!\u0002\u0004\u0002\u000bM\u0004\u0018M]6\u000b\u0005\u001dA\u0011AB1qC\u000eDWMC\u0001\n\u0003\ry'oZ\u0002\u0001'\r\u0001AB\u0006\t\u0003\u001bQi\u0011A\u0004\u0006\u0003\u001fA\tA!\u001e;jY*\u0011\u0011CE\u0001\u0005i\u0016\u001cHO\u0003\u0002\u0014\t\u0005\u00191/\u001d7\n\u0005Uq!!C)vKJLH+Z:u!\t9\"$D\u0001\u0019\u0015\tI\u0002\"A\u0005tG\u0006d\u0017\r^3ti&\u00111\u0004\u0007\u0002\u0012\u0005\u00164wN]3B]\u0012\fe\r^3s\u00032d\u0007\"B\u000f\u0001\t\u0003q\u0012A\u0002\u001fj]&$h\bF\u0001 !\t\u0001\u0003!D\u0001\u0003\u0011\u001d)\u0001A1A\u0005\n\t*\u0012a\t\t\u0003I\u0015j\u0011AE\u0005\u0003MI\u0011Ab\u00159be.\u001cVm]:j_:Da\u0001\u000b\u0001!\u0002\u0013\u0019\u0013AB:qCJ\\\u0007\u0005C\u0004+\u0001\t\u0007I\u0011B\u0016\u0002\u0019\u0011\fG/\u0019$jY\u0016\u0004\u0016\r\u001e5\u0016\u00031\u0002\"!L\u001a\u000f\u00059\nT\"A\u0018\u000b\u0003A\nQa]2bY\u0006L!AM\u0018\u0002\rA\u0013X\rZ3g\u0013\t!TG\u0001\u0004TiJLgn\u001a\u0006\u0003e=Baa\u000e\u0001!\u0002\u0013a\u0013!\u00043bi\u00064\u0015\u000e\\3QCRD\u0007\u0005C\u0003:\u0001\u0011\u0005#(A\u0005cK\u001a|'/Z!mYR\t1\b\u0005\u0002/y%\u0011Qh\f\u0002\u0005+:LG\u000fC\u0003@\u0001\u0011\u0005#(\u0001\u0005bMR,'/\u00117m\u0011\u0015\t\u0005\u0001\"\u0001;\u0003%!'o\u001c9UC\ndW\rC\u0003D\u0001\u0011\u0005A)A\fde\u0016\fG/Z,sSR,7k\\2lKR$\u0006N]3bIR1Q)T+[9z\u0003\"AR&\u000e\u0003\u001dS!\u0001S%\u0002\t1\fgn\u001a\u0006\u0002\u0015\u0006!!.\u0019<b\u0013\tauI\u0001\u0004UQJ,\u0017\r\u001a\u0005\u0006\u001d\n\u0003\raT\u0001\rg\u0016\u0014h/\u001a:T_\u000e\\W\r\u001e\t\u0003!Nk\u0011!\u0015\u0006\u0003%&\u000b1A\\3u\u0013\t!\u0016K\u0001\u0007TKJ4XM]*pG.,G\u000fC\u0003W\u0005\u0002\u0007q+A\u0005xe&$XMT;ngB\u0011a\u0006W\u0005\u00033>\u00121!\u00138u\u0011\u0015Y&\t1\u0001X\u0003\u001d\u0011xn\u001e(v[NDQ!\u0018\"A\u0002]\u000ba\"\u001b8uKJ4\u0018\r\\*fG>tG\rC\u0004`\u0005B\u0005\t\u0019\u00011\u0002\u0015\t\fGMU3d_J$7\u000f\u0005\u0002/C&\u0011!m\f\u0002\b\u0005>|G.Z1o\u0011\u0015!\u0007\u0001\"\u0001f\u0003m\u0019'/Z1uKN{7m[3u'R\u0014X-Y7j]\u001e$\u0006N]3bIRIQIZ4jWN,ho\u001f\u0005\u0006\u000b\r\u0004\ra\t\u0005\u0006Q\u000e\u0004\raV\u0001\u0005a>\u0014H\u000fC\u0003kG\u0002\u0007A&A\u0005uC\ndW\rU1uQ\")An\u0019a\u0001[\u0006yA/\u00192mK&#WM\u001c;jM&,'\u000f\u0005\u0002oc6\tqN\u0003\u0002q%\u0005A1-\u0019;bYf\u001cH/\u0003\u0002s_\nyA+\u00192mK&#WM\u001c;jM&,'\u000fC\u0004uGB\u0005\t\u0019\u0001\u0017\u0002\u001f\t\fGMU3d_J$\u0017i\u0019;j_:Dq!X2\u0011\u0002\u0003\u0007q\u000bC\u0004xGB\u0005\t\u0019\u0001=\u0002\u0017!\fg\u000eZ8gMNK'0\u001a\t\u0003]eL!A_\u0018\u0003\t1{gn\u001a\u0005\by\u000e\u0004\n\u00111\u0001a\u0003-\tW\u000f^8IC:$wN\u001a4\t\u000by\u0004A\u0011A@\u0002-\u0015DXmY;uKN#(/Z1nS:<\u0017J\\4fgR$RcOA\u0001\u0003\u000b\tI!!\u0004\u0002\u0012\u0005U\u0011\u0011DA\u000f\u0003?\t\t\u0003\u0003\u0004\u0002\u0004u\u0004\r\u0001L\u0001\ni\u0006\u0014G.\u001a(b[\u0016Da!a\u0002~\u0001\u00049\u0016!\u00032bi\u000eDg*^7t\u0011\u0019\tY! a\u0001/\u0006\u0001\"o\\<Ok6\u001cX)Y2i\u0005\u0006$8\r\u001b\u0005\u0007\u0003\u001fi\b\u0019A,\u0002!%tG/\u001a:wC2|emU8ve\u000e,\u0007BBA\n{\u0002\u0007q+\u0001\tj]R,'O^1m\u001f\u001aLenZ3ti\"1\u0011qC?A\u0002]\u000bqbY8oi&tW/Z*fG>tGm\u001d\u0005\u0007\u00037i\b\u0019\u00011\u0002%\u001d,g.\u001a:bi\u0016\u0014\u0015\r\u001a*fG>\u0014Hm\u001d\u0005\u0006iv\u0004\r\u0001\f\u0005\bov\u0004\n\u00111\u0001y\u0011\u001daX\u0010%AA\u0002\u0001Dq!!\n\u0001\t\u0003\t9#A\u0006de\u0016\fG/\u001a+bE2,GcB\u001e\u0002*\u0005-\u0012q\u0006\u0005\b\u0003\u0007\t\u0019\u00031\u0001-\u0011\u001d\ti#a\tA\u0002\u0001\f\u0011b\u001d;sK\u0006l\u0017N\\4\t\u000f\u0005E\u00121\u0005a\u0001A\u0006iq/\u001b;i\u0005\u0006$8\r\u001b'pC\u0012Dq!!\u000e\u0001\t\u0003\t9$\u0001\u000ede\u0016\fG/\u001a+bE2,w+\u001b;i\u0007>l\u0007\u000f\\3y)f\u0004X\rF\u0004<\u0003s\tY$!\u0010\t\u000f\u0005\r\u00111\u0007a\u0001Y!9\u0011QFA\u001a\u0001\u0004\u0001\u0007bBA\u0019\u0003g\u0001\r\u0001\u0019\u0005\b\u0003\u0003\u0002A\u0011AA\"\u0003A)\u00070Z2vi\u0016\u0014\u0015\r^2i\u0019>\fG\rF\u0002<\u0003\u000bBq!a\u0001\u0002@\u0001\u0007A\u0006C\u0004\u0002J\u0001!\t!a\u0013\u0002\t]\u0014\u0018\r\u001d\u000b\u0005\u0003\u001b\nI\u0007E\u0003\u0002P\u0005\rDF\u0004\u0003\u0002R\u0005uc\u0002BA*\u00033j!!!\u0016\u000b\u0007\u0005]s&\u0001\u0006d_2dWm\u0019;j_:LA!a\u0017\u0002V\u00059Q.\u001e;bE2,\u0017\u0002BA0\u0003C\nAb\u0016:baB,G-\u0011:sCfTA!a\u0017\u0002V%!\u0011QMA4\u0005\u0015ygMU3g\u0015\u0011\ty&!\u0019\t\u0011\u0005-\u0014q\ta\u0001\u0003[\nQ!\u0019:sCf\u0004BALA8Y%\u0019\u0011\u0011O\u0018\u0003\u000b\u0005\u0013(/Y=\t\u000f\u0005U\u0004\u0001\"\u0001\u0002x\u0005yq-\u001a;TKJ4XM]*pG.,G\u000fF\u0001P\u0011%\tY\bAI\u0001\n\u0003\ti(\u0001\u0011fq\u0016\u001cW\u000f^3TiJ,\u0017-\\5oO&sw-Z:uI\u0011,g-Y;mi\u0012JTCAA@U\rA\u0018\u0011Q\u0016\u0003\u0003\u0007\u0003B!!\"\u0002\u00106\u0011\u0011q\u0011\u0006\u0005\u0003\u0013\u000bY)A\u0005v]\u000eDWmY6fI*\u0019\u0011QR\u0018\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u0002\u0012\u0006\u001d%!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\"I\u0011Q\u0013\u0001\u0012\u0002\u0013\u0005\u0011qS\u0001\"Kb,7-\u001e;f'R\u0014X-Y7j]\u001eLenZ3ti\u0012\"WMZ1vYR$\u0013\u0007M\u000b\u0003\u00033S3\u0001YAA\u0011%\ti\nAI\u0001\n\u0003\t9*A\u0011de\u0016\fG/Z,sSR,7k\\2lKR$\u0006N]3bI\u0012\"WMZ1vYR$S\u0007C\u0005\u0002\"\u0002\t\n\u0011\"\u0001\u0002$\u0006)3M]3bi\u0016\u001cvnY6fiN#(/Z1nS:<G\u000b\u001b:fC\u0012$C-\u001a4bk2$H%N\u000b\u0003\u0003KS3\u0001LAA\u0011%\tI\u000bAI\u0001\n\u0003\tY+A\u0013de\u0016\fG/Z*pG.,Go\u0015;sK\u0006l\u0017N\\4UQJ,\u0017\r\u001a\u0013eK\u001a\fW\u000f\u001c;%mU\u0011\u0011Q\u0016\u0016\u0004/\u0006\u0005\u0005\"CAY\u0001E\u0005I\u0011AA?\u0003\u0015\u001a'/Z1uKN{7m[3u'R\u0014X-Y7j]\u001e$\u0006N]3bI\u0012\"WMZ1vYR$s\u0007C\u0005\u00026\u0002\t\n\u0011\"\u0001\u0002\u0018\u0006)3M]3bi\u0016\u001cvnY6fiN#(/Z1nS:<G\u000b\u001b:fC\u0012$C-\u001a4bk2$H\u0005\u000f")
/* loaded from: input_file:org/apache/spark/carbondata/TestStreamingTableWithRowParser.class */
public class TestStreamingTableWithRowParser extends QueryTest implements BeforeAndAfterAll {
    private final SparkSession spark;
    private final String dataFilePath;
    private final boolean invokeBeforeAllAndAfterAllEvenIfNoTestsAreExpected;

    public boolean invokeBeforeAllAndAfterAllEvenIfNoTestsAreExpected() {
        return this.invokeBeforeAllAndAfterAllEvenIfNoTestsAreExpected;
    }

    public /* synthetic */ Status org$scalatest$BeforeAndAfterAll$$super$run(Option option, Args args) {
        return FunSuiteLike.class.run(this, option, args);
    }

    public void org$scalatest$BeforeAndAfterAll$_setter_$invokeBeforeAllAndAfterAllEvenIfNoTestsAreExpected_$eq(boolean z) {
        this.invokeBeforeAllAndAfterAllEvenIfNoTestsAreExpected = z;
    }

    public void beforeAll(ConfigMap configMap) {
        BeforeAndAfterAll.class.beforeAll(this, configMap);
    }

    public void afterAll(ConfigMap configMap) {
        BeforeAndAfterAll.class.afterAll(this, configMap);
    }

    public Status run(Option<String> option, Args args) {
        return BeforeAndAfterAll.class.run(this, option, args);
    }

    private SparkSession spark() {
        return this.spark;
    }

    private String dataFilePath() {
        return this.dataFilePath;
    }

    public void beforeAll() {
        CarbonProperties.getInstance().addProperty("carbon.timestamp.format", "yyyy-MM-dd HH:mm:ss");
        CarbonProperties.getInstance().addProperty("carbon.date.format", "yyyy-MM-dd");
        sql("DROP DATABASE IF EXISTS streaming1 CASCADE");
        sql("CREATE DATABASE streaming1");
        sql("USE streaming1");
        dropTable();
        createTable("stream_table_filter", true, true);
        createTable("stream_table_with_mi", true, true);
        createTableWithComplexType("stream_table_filter_complex", true, true);
    }

    public void afterAll() {
        dropTable();
        sql("USE default");
        sql("DROP DATABASE IF EXISTS streaming1 CASCADE");
    }

    public void dropTable() {
        sql("drop table if exists streaming1.stream_table_filter");
        sql("drop table if exists streaming1.stream_table_with_mi");
        sql("drop table if exists streaming1.stream_table_filter_complex");
    }

    public Thread createWriteSocketThread(ServerSocket serverSocket, int i, int i2, int i3, boolean z) {
        return new TestStreamingTableWithRowParser$$anon$1(this, serverSocket, i, i2, i3, z);
    }

    public boolean createWriteSocketThread$default$5() {
        return false;
    }

    public Thread createSocketStreamingThread(final SparkSession sparkSession, final int i, final String str, final TableIdentifier tableIdentifier, final String str2, final int i2, final long j, final boolean z) {
        return new Thread(this, sparkSession, i, str, tableIdentifier, str2, i2, j, z) { // from class: org.apache.spark.carbondata.TestStreamingTableWithRowParser$$anon$2
            private final /* synthetic */ TestStreamingTableWithRowParser $outer;
            private final SparkSession spark$1;
            private final int port$1;
            private final String tablePath$1;
            private final TableIdentifier tableIdentifier$1;
            private final String badRecordAction$1;
            private final int intervalSecond$2;
            private final long handoffSize$1;
            private final boolean autoHandoff$1;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                StreamingQuery streamingQuery = null;
                try {
                    try {
                        streamingQuery = this.spark$1.readStream().format("socket").option("host", "localhost").option("port", this.port$1).load().as(this.spark$1.implicits().newStringEncoder()).map(new TestStreamingTableWithRowParser$$anon$2$$anonfun$4(this), this.spark$1.implicits().newStringArrayEncoder()).map(new TestStreamingTableWithRowParser$$anon$2$$anonfun$5(this), this.spark$1.implicits().newProductEncoder(package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(TestStreamingTableWithRowParser$$anon$2.class.getClassLoader()), new TypeCreator(this) { // from class: org.apache.spark.carbondata.TestStreamingTableWithRowParser$$anon$2$$typecreator11$1
                            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                                mirror.universe();
                                return mirror.staticClass("org.apache.spark.carbondata.StreamData").asType().toTypeConstructor();
                            }
                        }))).writeStream().format("carbondata").trigger(ProcessingTime$.MODULE$.apply(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", " seconds"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.intervalSecond$2)})))).option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(this.tablePath$1)).option("bad_records_action", this.badRecordAction$1).option("dbName", (String) this.tableIdentifier$1.database().get()).option("tableName", this.tableIdentifier$1.table()).option("carbon.streaming.segment.max.size", this.handoffSize$1).option("timestampformat", "yyyy-MM-dd HH:mm:ss").option("carbon.streaming.auto.handoff.enabled", this.autoHandoff$1).start();
                        streamingQuery.awaitTermination();
                    } catch (Throwable th) {
                        this.$outer.LOGGER().error("finished to ingest data", th);
                    }
                } finally {
                    if (streamingQuery != null) {
                        streamingQuery.stop();
                    }
                }
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.spark$1 = sparkSession;
                this.port$1 = i;
                this.tablePath$1 = str;
                this.tableIdentifier$1 = tableIdentifier;
                this.badRecordAction$1 = str2;
                this.intervalSecond$2 = i2;
                this.handoffSize$1 = j;
                this.autoHandoff$1 = z;
            }
        };
    }

    public String createSocketStreamingThread$default$5() {
        return "force";
    }

    public int createSocketStreamingThread$default$6() {
        return 2;
    }

    public long createSocketStreamingThread$default$7() {
        return 1073741824L;
    }

    public boolean createSocketStreamingThread$default$8() {
        return new StringOps(Predef$.MODULE$.augmentString("true")).toBoolean();
    }

    public void executeStreamingIngest(String str, int i, int i2, int i3, int i4, int i5, boolean z, String str2, long j, boolean z2) {
        TableIdentifier tableIdentifier = new TableIdentifier(str, Option$.MODULE$.apply("streaming1"));
        CarbonTable carbonTable = CarbonEnv$.MODULE$.getInstance(spark()).carbonMetaStore().lookupRelation(tableIdentifier, spark()).carbonTable();
        ServerSocket serverSocket = null;
        try {
            try {
                serverSocket = getServerSocket();
                Thread createWriteSocketThread = createWriteSocketThread(serverSocket, i, i2, i3, z);
                Thread createSocketStreamingThread = createSocketStreamingThread(spark(), serverSocket.getLocalPort(), carbonTable.getTablePath(), tableIdentifier, str2, i4, j, z2);
                createWriteSocketThread.start();
                createSocketStreamingThread.start();
                Thread.sleep(i5 * 1000);
                createSocketStreamingThread.interrupt();
                createWriteSocketThread.interrupt();
            } catch (Throwable th) {
                LOGGER().error("finished to ingest data", th);
            }
            if (serverSocket != null) {
                serverSocket.close();
            }
        } finally {
            if (serverSocket != null) {
                serverSocket.close();
            }
        }
    }

    public long executeStreamingIngest$default$9() {
        return 1073741824L;
    }

    public boolean executeStreamingIngest$default$10() {
        return new StringOps(Predef$.MODULE$.augmentString("true")).toBoolean();
    }

    public void createTable(String str, boolean z, boolean z2) {
        Predef$ predef$ = Predef$.MODULE$;
        StringContext stringContext = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         | CREATE TABLE streaming1.", "(\n         | id INT,\n         | name STRING,\n         | city STRING,\n         | salary FLOAT,\n         | tax DECIMAL(8,2),\n         | percent double,\n         | birthday DATE,\n         | register TIMESTAMP,\n         | updated TIMESTAMP\n         | )\n         | STORED AS carbondata\n         | TBLPROPERTIES(", "\n         | 'sort_columns'='name')\n         | "}));
        Predef$ predef$2 = Predef$.MODULE$;
        Object[] objArr = new Object[2];
        objArr[0] = str;
        objArr[1] = z ? "'streaming'='true', " : "";
        sql(new StringOps(predef$.augmentString(stringContext.s(predef$2.genericWrapArray(objArr)))).stripMargin());
        if (z2) {
            executeBatchLoad(str);
        }
    }

    public void createTableWithComplexType(String str, boolean z, boolean z2) {
        Predef$ predef$ = Predef$.MODULE$;
        StringContext stringContext = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         | CREATE TABLE streaming1.", "(\n         | id INT,\n         | name STRING,\n         | city STRING,\n         | salary FLOAT,\n         | tax DECIMAL(8,2),\n         | percent double,\n         | birthday DATE,\n         | register TIMESTAMP,\n         | updated TIMESTAMP,\n         | file struct<school:array<string>, age:int>\n         | )\n         | STORED AS carbondata\n         | TBLPROPERTIES(", "\n         | 'sort_columns'='name')\n         | "}));
        Predef$ predef$2 = Predef$.MODULE$;
        Object[] objArr = new Object[2];
        objArr[0] = str;
        objArr[1] = z ? "'streaming'='true', " : "";
        sql(new StringOps(predef$.augmentString(stringContext.s(predef$2.genericWrapArray(objArr)))).stripMargin());
        if (z2) {
            executeBatchLoad(str);
        }
    }

    public void executeBatchLoad(String str) {
        sql(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"LOAD DATA LOCAL INPATH '", "' INTO TABLE streaming1.", " OPTIONS"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{dataFilePath(), str}))).append("('HEADER'='true','COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')").toString());
    }

    public WrappedArray.ofRef<String> wrap(String[] strArr) {
        return new WrappedArray.ofRef<>(strArr);
    }

    public ServerSocket getServerSocket() {
        boolean z;
        int i = 7071;
        ServerSocket serverSocket = null;
        do {
            try {
                z = false;
                serverSocket = new ServerSocket(i);
            } catch (BindException e) {
                z = true;
                i += 2;
                if (i >= 65535) {
                    throw e;
                }
            }
        } while (z);
        return serverSocket;
    }

    public TestStreamingTableWithRowParser() {
        BeforeAndAfterAll.class.$init$(this);
        this.spark = sqlContext().sparkSession();
        this.dataFilePath = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "/streamSample.csv"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{resourcesPath()}));
        test("query on stream table with dictionary, sort_columns", Predef$.MODULE$.wrapRefArray(new Tag[0]), new TestStreamingTableWithRowParser$$anonfun$1(this));
        test("query on stream table with dictionary, sort_columns and complex column", Predef$.MODULE$.wrapRefArray(new Tag[0]), new TestStreamingTableWithRowParser$$anonfun$2(this));
        test("alter on stream table with dictionary, sort_columns and complex column", Predef$.MODULE$.wrapRefArray(new Tag[0]), new TestStreamingTableWithRowParser$$anonfun$3(this));
    }
}
