package com.google.cloud.spark.bigquery.acceptance;

import com.google.cloud.dataproc.v1.Cluster;
import com.google.cloud.dataproc.v1.ClusterConfig;
import com.google.cloud.dataproc.v1.ClusterControllerClient;
import com.google.cloud.dataproc.v1.ClusterControllerSettings;
import com.google.cloud.dataproc.v1.DiskConfig;
import com.google.cloud.dataproc.v1.GceClusterConfig;
import com.google.cloud.dataproc.v1.InstanceGroupConfig;
import com.google.cloud.dataproc.v1.Job;
import com.google.cloud.dataproc.v1.JobControllerClient;
import com.google.cloud.dataproc.v1.JobControllerSettings;
import com.google.cloud.dataproc.v1.JobPlacement;
import com.google.cloud.dataproc.v1.JobStatus;
import com.google.cloud.dataproc.v1.PySparkJob;
import com.google.cloud.dataproc.v1.SoftwareConfig;
import com.google.common.collect.ImmutableList;
import com.google.common.truth.Truth;
import java.io.FileInputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.Assume;
import org.junit.Test;

/* loaded from: input_file:com/google/cloud/spark/bigquery/acceptance/DataprocAcceptanceTestBase.class */
public class DataprocAcceptanceTestBase {
    protected static final ClusterProperty DISABLE_CONSCRYPT = ClusterProperty.of("dataproc:dataproc.conscrypt.provider.enable", "false", "nc");
    protected static final ImmutableList<ClusterProperty> DISABLE_CONSCRYPT_LIST = ImmutableList.builder().add(DISABLE_CONSCRYPT).build();
    private AcceptanceTestContext context;
    private boolean sparkStreamingSupported;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.google.cloud.spark.bigquery.acceptance.DataprocAcceptanceTestBase$1, reason: invalid class name */
    /* loaded from: input_file:com/google/cloud/spark/bigquery/acceptance/DataprocAcceptanceTestBase$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$google$cloud$dataproc$v1$JobStatus$State = new int[JobStatus.State.values().length];

        static {
            try {
                $SwitchMap$com$google$cloud$dataproc$v1$JobStatus$State[JobStatus.State.DONE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$google$cloud$dataproc$v1$JobStatus$State[JobStatus.State.CANCELLED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$google$cloud$dataproc$v1$JobStatus$State[JobStatus.State.ERROR.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:com/google/cloud/spark/bigquery/acceptance/DataprocAcceptanceTestBase$ClusterProperty.class */
    protected static class ClusterProperty {
        private String key;
        private String value;
        private String marker;

        private ClusterProperty(String str, String str2, String str3) {
            this.key = str;
            this.value = str2;
            this.marker = str3;
        }

        protected static ClusterProperty of(String str, String str2, String str3) {
            return new ClusterProperty(str, str2, str3);
        }

        public String getKey() {
            return this.key;
        }

        public String getValue() {
            return this.value;
        }

        public String getMarker() {
            return this.marker;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:com/google/cloud/spark/bigquery/acceptance/DataprocAcceptanceTestBase$ThrowingConsumer.class */
    public interface ThrowingConsumer<T> {
        void accept(T t) throws Exception;
    }

    protected DataprocAcceptanceTestBase(AcceptanceTestContext acceptanceTestContext) {
        this(acceptanceTestContext, true);
    }

    protected DataprocAcceptanceTestBase(AcceptanceTestContext acceptanceTestContext, boolean z) {
        this.context = acceptanceTestContext;
        this.sparkStreamingSupported = z;
    }

    protected static AcceptanceTestContext setup(String str, String str2, List<ClusterProperty> list) throws Exception {
        String format = String.format("%s-%s%s%s", Long.valueOf(System.currentTimeMillis()), Character.valueOf(str.charAt(0)), Character.valueOf(str.charAt(2)), list.isEmpty() ? "" : (String) list.stream().map((v0) -> {
            return v0.getMarker();
        }).collect(Collectors.joining("-", "-", "")));
        Map map = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        String createTestBaseGcsDir = AcceptanceTestUtils.createTestBaseGcsDir(format);
        String str3 = createTestBaseGcsDir + "/connector.jar";
        AcceptanceTestUtils.uploadConnectorJar(AcceptanceTestConstants.CONNECTOR_JAR_DIRECTORY, str2, str3);
        AcceptanceTestContext acceptanceTestContext = new AcceptanceTestContext(format, createClusterIfNeeded(str, format, map, str3), createTestBaseGcsDir, str3);
        AcceptanceTestUtils.createBqDataset(acceptanceTestContext.bqDataset);
        return acceptanceTestContext;
    }

    protected static void tearDown(AcceptanceTestContext acceptanceTestContext) throws Exception {
        if (acceptanceTestContext != null) {
            terminateCluster(acceptanceTestContext.clusterId);
            AcceptanceTestUtils.deleteGcsDir(acceptanceTestContext.testBaseGcsDir);
            AcceptanceTestUtils.deleteBqDatasetAndTables(acceptanceTestContext.bqDataset);
        }
    }

    protected static String createClusterIfNeeded(String str, String str2, Map<String, String> map, String str3) throws Exception {
        String generateClusterName = AcceptanceTestUtils.generateClusterName(str2);
        cluster(clusterControllerClient -> {
        });
        return generateClusterName;
    }

    protected static void terminateCluster(String str) throws Exception {
        cluster(clusterControllerClient -> {
        });
    }

    private static void cluster(ThrowingConsumer<ClusterControllerClient> throwingConsumer) throws Exception {
        ClusterControllerClient create = ClusterControllerClient.create(ClusterControllerSettings.newBuilder().setEndpoint(AcceptanceTestConstants.DATAPROC_ENDPOINT).build());
        Throwable th = null;
        try {
            try {
                throwingConsumer.accept(create);
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    private static Cluster createCluster(String str, String str2, Map<String, String> map, String str3) {
        return Cluster.newBuilder().setClusterName(str).setProjectId(AcceptanceTestConstants.PROJECT_ID).setConfig(ClusterConfig.newBuilder().setGceClusterConfig(GceClusterConfig.newBuilder().setNetworkUri("default").setZoneUri("us-west1-a").putMetadata("SPARK_BQ_CONNECTOR_URL", str3)).setMasterConfig(InstanceGroupConfig.newBuilder().setNumInstances(1).setMachineTypeUri("n1-standard-4").setDiskConfig(DiskConfig.newBuilder().setBootDiskType("pd-standard").setBootDiskSizeGb(300).setNumLocalSsds(0))).setWorkerConfig(InstanceGroupConfig.newBuilder().setNumInstances(2).setMachineTypeUri("n1-standard-4").setDiskConfig(DiskConfig.newBuilder().setBootDiskType("pd-standard").setBootDiskSizeGb(300).setNumLocalSsds(0))).setSoftwareConfig(SoftwareConfig.newBuilder().setImageVersion(str2).putAllProperties(map))).build();
    }

    @Test
    public void testRead() throws Exception {
        Truth.assertThat(createAndRunPythonJob("test-read", "read_shakespeare.py", null, Arrays.asList(this.context.getResultsDirUri("test-read")), 120L).getStatus().getState()).isEqualTo(JobStatus.State.DONE);
        Truth.assertThat(AcceptanceTestUtils.getCsv(this.context.getResultsDirUri("test-read")).trim()).isEqualTo("spark,10");
    }

    @Test
    public void writeStream() throws Exception {
        Assume.assumeTrue("Spark streaming is not supported by this connector", this.sparkStreamingSupported);
        AcceptanceTestUtils.uploadToGcs(getClass().getResourceAsStream("/acceptance/write_stream_data.json"), this.context.testBaseGcsDir + "/write-stream-test/json/write_stream_data.json", "application/json");
        Truth.assertThat(createAndRunPythonJob("write-stream-test", "write_stream.py", null, Arrays.asList(this.context.testBaseGcsDir + "/write-stream-test/json/", this.context.bqDataset, this.context.bqStreamTable, AcceptanceTestUtils.BUCKET), 120L).getStatus().getState()).isEqualTo(JobStatus.State.DONE);
        Truth.assertThat(Boolean.valueOf(AcceptanceTestUtils.getNumOfRowsOfBqTable(this.context.bqDataset, this.context.bqStreamTable) == 2));
    }

    @Test
    public void testBigNumeric() throws Exception {
        Path artifact = AcceptanceTestUtils.getArtifact(Paths.get("../../spark-bigquery-python-lib/target", new String[0]), "spark-bigquery", ".zip");
        String str = this.context.testBaseGcsDir + "/test-big-numeric/big_numeric_acceptance_test.zip";
        AcceptanceTestUtils.uploadToGcs(new FileInputStream(artifact.toFile()), str, "application/zip");
        AcceptanceTestUtils.runBqQuery(String.format(AcceptanceTestConstants.BIGNUMERIC_TABLE_QUERY_TEMPLATE, this.context.bqDataset, this.context.bqTable));
        Truth.assertThat(createAndRunPythonJob("test-big-numeric", "big_numeric.py", str, Arrays.asList(this.context.bqDataset + "." + this.context.bqTable, this.context.getResultsDirUri("test-big-numeric")), 120L).getStatus().getState()).isEqualTo(JobStatus.State.DONE);
        Truth.assertThat(AcceptanceTestUtils.getCsv(this.context.getResultsDirUri("test-big-numeric")).trim()).isEqualTo("-0.34992332820282019728792003956564819968,0.34992332820282019728792003956564819967");
    }

    private Job createAndRunPythonJob(String str, String str2, String str3, List<String> list, long j) throws Exception {
        AcceptanceTestUtils.uploadToGcs(getClass().getResourceAsStream("/acceptance/" + str2), this.context.getScriptUri(str), "text/x-python");
        return runAndWait(Job.newBuilder().setPlacement(JobPlacement.newBuilder().setClusterName(this.context.clusterId)).setPysparkJob(createPySparkJobBuilder(str, str3, list)).build(), Duration.ofSeconds(j));
    }

    private PySparkJob.Builder createPySparkJobBuilder(String str, String str2, List<String> list) {
        PySparkJob.Builder addJarFileUris = PySparkJob.newBuilder().setMainPythonFileUri(this.context.getScriptUri(str)).addJarFileUris(this.context.connectorJarUri);
        if (str2 != null && str2.length() != 0) {
            addJarFileUris.addPythonFileUris(str2);
            addJarFileUris.addFileUris(str2);
        }
        if (list != null && list.size() != 0) {
            addJarFileUris.addAllArgs(list);
        }
        return addJarFileUris;
    }

    private Job runAndWait(Job job, Duration duration) throws Exception {
        JobControllerClient create = JobControllerClient.create(JobControllerSettings.newBuilder().setEndpoint(AcceptanceTestConstants.DATAPROC_ENDPOINT).build());
        Throwable th = null;
        try {
            try {
                String jobId = create.submitJob(AcceptanceTestConstants.PROJECT_ID, AcceptanceTestConstants.REGION, job).getReference().getJobId();
                Job job2 = (Job) CompletableFuture.supplyAsync(() -> {
                    return waitForJobCompletion(create, AcceptanceTestConstants.PROJECT_ID, AcceptanceTestConstants.REGION, jobId);
                }).get(duration.getSeconds(), TimeUnit.SECONDS);
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                return job2;
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    Job waitForJobCompletion(JobControllerClient jobControllerClient, String str, String str2, String str3) {
        while (true) {
            Job job = jobControllerClient.getJob(str, str2, str3);
            switch (AnonymousClass1.$SwitchMap$com$google$cloud$dataproc$v1$JobStatus$State[job.getStatus().getState().ordinal()]) {
                case 1:
                case 2:
                case 3:
                    return job;
                default:
                    try {
                        TimeUnit.SECONDS.sleep(1L);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
            }
        }
    }
}
