package com.google.cloud.spark.bigquery.integration;

import com.google.cloud.spark.bigquery.integration.SparkBigQueryIntegrationTestBase;
import com.google.common.truth.Truth;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.json.JSONObject;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.ExternalResource;

/* loaded from: input_file:com/google/cloud/spark/bigquery/integration/OpenLineageIntegrationTestBase.class */
public class OpenLineageIntegrationTestBase {

    @ClassRule
    public static SparkBigQueryIntegrationTestBase.TestDataset testDataset = new SparkBigQueryIntegrationTestBase.TestDataset();

    @ClassRule
    public static CustomSessionFactory sessionFactory = new CustomSessionFactory();
    protected String testTable;
    protected SparkSession spark = sessionFactory.spark;
    protected File lineageFile = sessionFactory.lineageFile;

    /* loaded from: input_file:com/google/cloud/spark/bigquery/integration/OpenLineageIntegrationTestBase$CustomSessionFactory.class */
    protected static class CustomSessionFactory extends ExternalResource {
        SparkSession spark;
        File lineageFile;

        protected CustomSessionFactory() {
        }

        protected void before() throws Throwable {
            this.lineageFile = File.createTempFile("openlineage_test_" + System.nanoTime(), ".log");
            this.lineageFile.deleteOnExit();
            this.spark = SparkSession.builder().master("local").appName("openlineage_test_bigquery_connector").config("spark.ui.enabled", "false").config("spark.default.parallelism", 20L).config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener").config("spark.openlineage.transport.type", "file").config("spark.openlineage.transport.location", this.lineageFile.getAbsolutePath()).getOrCreate();
            this.spark.sparkContext().setLogLevel("WARN");
        }
    }

    @Before
    public void createTestTable() {
        this.testTable = "test_" + System.nanoTime();
    }

    private List<JSONObject> parseEventLogs(File file) throws Exception {
        Scanner scanner = new Scanner(file);
        Throwable th = null;
        try {
            try {
                ArrayList arrayList = new ArrayList();
                while (scanner.hasNextLine()) {
                    JSONObject jSONObject = new JSONObject(scanner.nextLine());
                    if (!jSONObject.getJSONArray("inputs").isEmpty() && !jSONObject.getJSONArray("outputs").isEmpty()) {
                        arrayList.add(jSONObject);
                    }
                }
                if (scanner != null) {
                    if (0 != 0) {
                        try {
                            scanner.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        scanner.close();
                    }
                }
                return arrayList;
            } finally {
            }
        } catch (Throwable th3) {
            if (scanner != null) {
                if (th != null) {
                    try {
                        scanner.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    scanner.close();
                }
            }
            throw th3;
        }
    }

    private String getFieldName(JSONObject jSONObject, String str) {
        return ((JSONObject) jSONObject.getJSONArray(str).get(0)).getString("name");
    }

    @Test
    public void testLineageEvent() throws Exception {
        String str = testDataset.toString() + "." + this.testTable;
        this.spark.read().format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load().createOrReplaceTempView("words");
        this.spark.sql("SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word").write().format("bigquery").mode(SaveMode.Append).option("table", str).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("writeMethod", "direct").save();
        List<JSONObject> parseEventLogs = parseEventLogs(this.lineageFile);
        Truth.assertThat(parseEventLogs).isNotEmpty();
        parseEventLogs.forEach(jSONObject -> {
            Truth.assertThat(getFieldName(jSONObject, "inputs")).matches("bigquery-public-data.samples.shakespeare");
            Truth.assertThat(getFieldName(jSONObject, "outputs")).matches(str);
        });
    }
}
