package org.apache.hudi.utilities;

import java.io.IOException;
import java.io.Serializable;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.HoodieReadClient;
import org.apache.hudi.HoodieWriteClient;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.minicluster.HdfsTestService;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.utilities.HDFSParquetImporter;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/hudi/utilities/TestHDFSParquetImporter.class */
public class TestHDFSParquetImporter implements Serializable {
    private static String dfsBasePath;
    private static HdfsTestService hdfsTestService;
    private static MiniDFSCluster dfsCluster;
    private static DistributedFileSystem dfs;

    @BeforeClass
    public static void initClass() throws Exception {
        hdfsTestService = new HdfsTestService();
        dfsCluster = hdfsTestService.start(true);
        dfs = dfsCluster.getFileSystem();
        dfsBasePath = dfs.getWorkingDirectory().toString();
        dfs.mkdirs(new Path(dfsBasePath));
    }

    @AfterClass
    public static void cleanupClass() throws Exception {
        if (hdfsTestService != null) {
            hdfsTestService.stop();
        }
    }

    @Test
    public void testImportWithRetries() throws Exception {
        JavaSparkContext javaSparkContext = null;
        try {
            javaSparkContext = getJavaSparkContext();
            String path = new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName()).toString();
            Path path2 = new Path(path, "testTarget");
            final String path3 = new Path(path, "file.schema").toString();
            Path path4 = new Path(path, "testSrc");
            createRecords(path4);
            HDFSParquetImporter.Config hDFSParquetImporterConfig = getHDFSParquetImporterConfig(path4.toString(), path2.toString(), "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, path3);
            final AtomicInteger atomicInteger = new AtomicInteger(3);
            final AtomicInteger atomicInteger2 = new AtomicInteger(0);
            Assert.assertEquals(0L, new HDFSParquetImporter(hDFSParquetImporterConfig) { // from class: org.apache.hudi.utilities.TestHDFSParquetImporter.1
                protected int dataImport(JavaSparkContext javaSparkContext2) throws IOException {
                    int dataImport = super.dataImport(javaSparkContext2);
                    if (atomicInteger.decrementAndGet() == 0) {
                        atomicInteger2.incrementAndGet();
                        TestHDFSParquetImporter.this.createSchemaFile(path3);
                    }
                    return dataImport;
                }
            }.dataImport(javaSparkContext, atomicInteger.get()));
            Assert.assertEquals(atomicInteger.get(), -1L);
            Assert.assertEquals(atomicInteger2.get(), 1L);
            boolean z = false;
            HashMap hashMap = new HashMap();
            RemoteIterator listFiles = dfs.listFiles(path2, true);
            while (listFiles.hasNext()) {
                LocatedFileStatus locatedFileStatus = (LocatedFileStatus) listFiles.next();
                z = z || locatedFileStatus.getPath().toString().endsWith(".commit");
                if (locatedFileStatus.getPath().toString().endsWith("parquet")) {
                    SQLContext sQLContext = new SQLContext(javaSparkContext);
                    String path5 = locatedFileStatus.getPath().getParent().toString();
                    long count = sQLContext.read().parquet(locatedFileStatus.getPath().toString()).count();
                    if (!hashMap.containsKey(path5)) {
                        hashMap.put(path5, 0L);
                    }
                    hashMap.put(path5, Long.valueOf(((Long) hashMap.get(path5)).longValue() + count));
                }
            }
            Assert.assertTrue("commit file is missing", z);
            Assert.assertEquals("partition is missing", 4L, hashMap.size());
            Iterator it = hashMap.entrySet().iterator();
            while (it.hasNext()) {
                Assert.assertEquals("missing records", 24L, ((Long) ((Map.Entry) it.next()).getValue()).longValue());
            }
            if (javaSparkContext != null) {
                javaSparkContext.stop();
            }
        } catch (Throwable th) {
            if (javaSparkContext != null) {
                javaSparkContext.stop();
            }
            throw th;
        }
    }

    private void createRecords(Path path) throws ParseException, IOException {
        Path path2 = new Path(path.toString(), "file1.parquet");
        long time = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
        ArrayList arrayList = new ArrayList();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 96) {
                break;
            }
            arrayList.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(j2), "rider-" + j2, "driver-" + j2, time + TimeUnit.HOURS.toSeconds(j2)));
            j = j2 + 1;
        }
        ParquetWriter build = AvroParquetWriter.builder(path2).withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            build.write((GenericRecord) it.next());
        }
        build.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createSchemaFile(String str) throws IOException {
        FSDataOutputStream create = dfs.create(new Path(str));
        create.write("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}".getBytes());
        create.close();
    }

    @Test
    public void testSchemaFile() throws Exception {
        JavaSparkContext javaSparkContext = null;
        try {
            javaSparkContext = getJavaSparkContext();
            String path = new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName()).toString();
            Path path2 = new Path(path, "testTarget");
            Path path3 = new Path(path.toString(), "srcTest");
            Path path4 = new Path(path.toString(), "missingFile.schema");
            HDFSParquetImporter hDFSParquetImporter = new HDFSParquetImporter(getHDFSParquetImporterConfig(path3.toString(), path2.toString(), "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, path4.toString()));
            Assert.assertEquals(-1L, hDFSParquetImporter.dataImport(javaSparkContext, 0));
            dfs.create(path4).write("Random invalid schema data".getBytes());
            Assert.assertEquals(-1L, hDFSParquetImporter.dataImport(javaSparkContext, 0));
            if (javaSparkContext != null) {
                javaSparkContext.stop();
            }
        } catch (Throwable th) {
            if (javaSparkContext != null) {
                javaSparkContext.stop();
            }
            throw th;
        }
    }

    @Test
    public void testRowAndPartitionKey() throws Exception {
        JavaSparkContext javaSparkContext = null;
        try {
            javaSparkContext = getJavaSparkContext();
            String path = new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName()).toString();
            Path path2 = new Path(path, "testTarget");
            createRecords(new Path(path, "testSrc"));
            createSchemaFile(new Path(path.toString(), "missingFile.schema").toString());
            Assert.assertEquals(-1L, new HDFSParquetImporter(getHDFSParquetImporterConfig(r0.toString(), path2.toString(), "testTable", "COPY_ON_WRITE", "invalidRowKey", "timestamp", 1, r0.toString())).dataImport(javaSparkContext, 0));
            Assert.assertEquals(-1L, new HDFSParquetImporter(getHDFSParquetImporterConfig(r0.toString(), path2.toString(), "testTable", "COPY_ON_WRITE", "_row_key", "invalidTimeStamp", 1, r0.toString())).dataImport(javaSparkContext, 0));
            if (javaSparkContext != null) {
                javaSparkContext.stop();
            }
        } catch (Throwable th) {
            if (javaSparkContext != null) {
                javaSparkContext.stop();
            }
            throw th;
        }
    }

    private HDFSParquetImporter.Config getHDFSParquetImporterConfig(String str, String str2, String str3, String str4, String str5, String str6, int i, String str7) {
        HDFSParquetImporter.Config config = new HDFSParquetImporter.Config();
        config.srcPath = str;
        config.targetPath = str2;
        config.tableName = str3;
        config.tableType = str4;
        config.rowKey = str5;
        config.partitionKey = str6;
        config.parallelism = i;
        config.schemaFile = str7;
        return config;
    }

    private JavaSparkContext getJavaSparkContext() {
        return new JavaSparkContext(HoodieReadClient.addHoodieSupport(HoodieWriteClient.registerClasses(new SparkConf().setAppName("TestConversionCommand").setMaster("local[1]"))));
    }
}
