package org.apache.hudi.utilities.functional;

import java.io.IOException;
import java.io.Serializable;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.testutils.FunctionalTestHarness;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.utilities.HDFSParquetImporter;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Disabled("Disable due to flakiness and feature deprecation.")
@Tag("functional")
/* loaded from: input_file:org/apache/hudi/utilities/functional/TestHDFSParquetImporter.class */
public class TestHDFSParquetImporter extends FunctionalTestHarness implements Serializable {
    private String basePath;
    private transient Path hoodieFolder;
    private transient Path srcFolder;
    private transient List<GenericRecord> insertData;

    /* loaded from: input_file:org/apache/hudi/utilities/functional/TestHDFSParquetImporter$HoodieTripModel.class */
    public static class HoodieTripModel {
        long timestamp;
        String rowKey;
        String rider;
        String driver;
        double beginLat;
        double beginLon;
        double endLat;
        double endLon;

        public HoodieTripModel(long j, String str, String str2, String str3, double d, double d2, double d3, double d4) {
            this.timestamp = j;
            this.rowKey = str;
            this.rider = str2;
            this.driver = str3;
            this.beginLat = d;
            this.beginLon = d2;
            this.endLat = d3;
            this.endLon = d4;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            HoodieTripModel hoodieTripModel = (HoodieTripModel) obj;
            return this.timestamp == hoodieTripModel.timestamp && this.rowKey.equals(hoodieTripModel.rowKey) && this.rider.equals(hoodieTripModel.rider) && this.driver.equals(hoodieTripModel.driver) && this.beginLat == hoodieTripModel.beginLat && this.beginLon == hoodieTripModel.beginLon && this.endLat == hoodieTripModel.endLat && this.endLon == hoodieTripModel.endLon;
        }

        public int hashCode() {
            return Objects.hash(Long.valueOf(this.timestamp), this.rowKey, this.rider, this.driver, Double.valueOf(this.beginLat), Double.valueOf(this.beginLon), Double.valueOf(this.endLat), Double.valueOf(this.endLon));
        }
    }

    @BeforeEach
    public void init() throws IOException, ParseException {
        this.basePath = new Path(dfsBasePath(), Thread.currentThread().getStackTrace()[1].getMethodName()).toString();
        this.hoodieFolder = new Path(this.basePath, "testTarget");
        this.srcFolder = new Path(this.basePath, "testSrc");
        this.insertData = createInsertRecords(this.srcFolder);
    }

    @AfterEach
    public void clean() throws IOException {
        dfs().delete(new Path(this.basePath), true);
    }

    @Test
    public void testImportWithRetries() throws Exception {
        final String path = new Path(this.basePath, "file.schema").toString();
        HDFSParquetImporter.Config hDFSParquetImporterConfig = getHDFSParquetImporterConfig(this.srcFolder.toString(), this.hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, path);
        final AtomicInteger atomicInteger = new AtomicInteger(3);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        Assertions.assertEquals(0, new HDFSParquetImporter(hDFSParquetImporterConfig) { // from class: org.apache.hudi.utilities.functional.TestHDFSParquetImporter.1
            protected int dataImport(JavaSparkContext javaSparkContext) throws IOException {
                int dataImport = super.dataImport(javaSparkContext);
                if (atomicInteger.decrementAndGet() == 0) {
                    atomicInteger2.incrementAndGet();
                    TestHDFSParquetImporter.this.createSchemaFile(path);
                }
                return dataImport;
            }
        }.dataImport(jsc(), atomicInteger.get()));
        Assertions.assertEquals(-1, atomicInteger.get());
        Assertions.assertEquals(1, atomicInteger2.get());
        boolean z = false;
        HashMap hashMap = new HashMap();
        RemoteIterator listFiles = dfs().listFiles(this.hoodieFolder, true);
        while (listFiles.hasNext()) {
            LocatedFileStatus locatedFileStatus = (LocatedFileStatus) listFiles.next();
            z = z || locatedFileStatus.getPath().toString().endsWith(".commit");
            if (locatedFileStatus.getPath().toString().endsWith("parquet")) {
                String path2 = locatedFileStatus.getPath().getParent().toString();
                long count = sqlContext().read().parquet(locatedFileStatus.getPath().toString()).count();
                if (!hashMap.containsKey(path2)) {
                    hashMap.put(path2, 0L);
                }
                hashMap.put(path2, Long.valueOf(((Long) hashMap.get(path2)).longValue() + count));
            }
        }
        Assertions.assertTrue(z, "commit file is missing");
        Assertions.assertEquals(4, hashMap.size(), "partition is missing");
        Iterator it = hashMap.entrySet().iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(24L, ((Long) ((Map.Entry) it.next()).getValue()).longValue(), "missing records");
        }
    }

    private void insert(JavaSparkContext javaSparkContext) throws IOException {
        String path = new Path(this.basePath, "file.schema").toString();
        createSchemaFile(path);
        new HDFSParquetImporter(getHDFSParquetImporterConfig(this.srcFolder.toString(), this.hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, path)).dataImport(javaSparkContext, 0);
    }

    @Test
    public void testImportWithInsert() throws IOException, ParseException {
        insert(jsc());
        List list = (List) HoodieClientTestUtils.read(jsc(), this.basePath + "/testTarget", sqlContext(), dfs(), new String[]{this.basePath + "/testTarget/*/*/*/*"}).select("timestamp", new String[]{"_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon"}).collectAsList().stream().map(row -> {
            return new HoodieTripModel(row.getLong(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4), row.getDouble(5), row.getDouble(6), row.getDouble(7));
        }).collect(Collectors.toList());
        List list2 = (List) this.insertData.stream().map(genericRecord -> {
            return new HoodieTripModel(Long.parseLong(genericRecord.get("timestamp").toString()), genericRecord.get("_row_key").toString(), genericRecord.get("rider").toString(), genericRecord.get("driver").toString(), Double.parseDouble(genericRecord.get("begin_lat").toString()), Double.parseDouble(genericRecord.get("begin_lon").toString()), Double.parseDouble(genericRecord.get("end_lat").toString()), Double.parseDouble(genericRecord.get("end_lon").toString()));
        }).collect(Collectors.toList());
        Assertions.assertTrue(list.containsAll(list2) && list2.containsAll(list) && list.size() == list2.size());
    }

    @Test
    public void testImportWithUpsert() throws IOException, ParseException {
        insert(jsc());
        String path = new Path(this.basePath, "file.schema").toString();
        Path path2 = new Path(this.basePath, "testUpsertSrc");
        List<GenericRecord> createUpsertRecords = createUpsertRecords(path2);
        HDFSParquetImporter.Config hDFSParquetImporterConfig = getHDFSParquetImporterConfig(path2.toString(), this.hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, path);
        hDFSParquetImporterConfig.command = "upsert";
        new HDFSParquetImporter(hDFSParquetImporterConfig).dataImport(jsc(), 0);
        List<GenericRecord> subList = this.insertData.subList(11, 96);
        subList.addAll(createUpsertRecords);
        List list = (List) HoodieClientTestUtils.read(jsc(), this.basePath + "/testTarget", sqlContext(), dfs(), new String[]{this.basePath + "/testTarget/*/*/*/*"}).select("timestamp", new String[]{"_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon"}).collectAsList().stream().map(row -> {
            return new HoodieTripModel(row.getLong(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4), row.getDouble(5), row.getDouble(6), row.getDouble(7));
        }).collect(Collectors.toList());
        List list2 = (List) subList.stream().map(genericRecord -> {
            return new HoodieTripModel(Long.parseLong(genericRecord.get("timestamp").toString()), genericRecord.get("_row_key").toString(), genericRecord.get("rider").toString(), genericRecord.get("driver").toString(), Double.parseDouble(genericRecord.get("begin_lat").toString()), Double.parseDouble(genericRecord.get("begin_lon").toString()), Double.parseDouble(genericRecord.get("end_lat").toString()), Double.parseDouble(genericRecord.get("end_lon").toString()));
        }).collect(Collectors.toList());
        Assertions.assertTrue(list.containsAll(list2) && list2.containsAll(list) && list.size() == list2.size());
    }

    public List<GenericRecord> createInsertRecords(Path path) throws ParseException, IOException {
        Path path2 = new Path(path.toString(), "file1.parquet");
        long time = HoodieActiveTimeline.parseDateFromInstantTime("20170203000000").getTime() / 1000;
        ArrayList arrayList = new ArrayList();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 96) {
                break;
            }
            arrayList.add(new HoodieTestDataGenerator().generateGenericRecord(Long.toString(j2), "0", "rider-" + j2, "driver-" + j2, time + TimeUnit.HOURS.toSeconds(j2)));
            j = j2 + 1;
        }
        ParquetWriter build = AvroParquetWriter.builder(path2).withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build();
        Throwable th = null;
        try {
            try {
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    build.write((GenericRecord) it.next());
                }
                if (build != null) {
                    if (0 != 0) {
                        try {
                            build.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        build.close();
                    }
                }
                return arrayList;
            } finally {
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    public List<GenericRecord> createUpsertRecords(Path path) throws ParseException, IOException {
        Path path2 = new Path(path.toString(), "file1.parquet");
        long time = HoodieActiveTimeline.parseDateFromInstantTime("20170203000000").getTime() / 1000;
        ArrayList arrayList = new ArrayList();
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 11) {
                break;
            }
            arrayList.add(hoodieTestDataGenerator.generateGenericRecord(Long.toString(j2), "0", "rider-upsert-" + j2, "driver-upsert" + j2, time + TimeUnit.HOURS.toSeconds(j2)));
            j = j2 + 1;
        }
        long j3 = 96;
        while (true) {
            long j4 = j3;
            if (j4 >= 100) {
                break;
            }
            arrayList.add(hoodieTestDataGenerator.generateGenericRecord(Long.toString(j4), "0", "rider-upsert-" + j4, "driver-upsert" + j4, time + TimeUnit.HOURS.toSeconds(j4)));
            j3 = j4 + 1;
        }
        ParquetWriter build = AvroParquetWriter.builder(path2).withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build();
        Throwable th = null;
        try {
            try {
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    build.write((GenericRecord) it.next());
                }
                if (build != null) {
                    if (0 != 0) {
                        try {
                            build.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        build.close();
                    }
                }
                return arrayList;
            } finally {
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createSchemaFile(String str) throws IOException {
        FSDataOutputStream create = dfs().create(new Path(str));
        create.write("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}".getBytes());
        create.close();
    }

    @Test
    public void testSchemaFile() throws Exception {
        Path path = new Path(this.basePath, "testTarget");
        Path path2 = new Path(this.basePath.toString(), "srcTest");
        Path path3 = new Path(this.basePath.toString(), "missingFile.schema");
        HDFSParquetImporter hDFSParquetImporter = new HDFSParquetImporter(getHDFSParquetImporterConfig(path2.toString(), path.toString(), "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, path3.toString()));
        Assertions.assertEquals(-1, hDFSParquetImporter.dataImport(jsc(), 0));
        dfs().create(path3).write("Random invalid schema data".getBytes());
        Assertions.assertEquals(-1, hDFSParquetImporter.dataImport(jsc(), 0));
    }

    @Test
    public void testRowAndPartitionKey() throws Exception {
        Path path = new Path(this.basePath.toString(), "missingFile.schema");
        createSchemaFile(path.toString());
        Assertions.assertEquals(-1, new HDFSParquetImporter(getHDFSParquetImporterConfig(this.srcFolder.toString(), this.hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", "invalidRowKey", "timestamp", 1, path.toString())).dataImport(jsc(), 0));
        Assertions.assertEquals(-1, new HDFSParquetImporter(getHDFSParquetImporterConfig(this.srcFolder.toString(), this.hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", "_row_key", "invalidTimeStamp", 1, path.toString())).dataImport(jsc(), 0));
    }

    public HDFSParquetImporter.Config getHDFSParquetImporterConfig(String str, String str2, String str3, String str4, String str5, String str6, int i, String str7) {
        HDFSParquetImporter.Config config = new HDFSParquetImporter.Config();
        config.srcPath = str;
        config.targetPath = str2;
        config.tableName = str3;
        config.tableType = str4;
        config.rowKey = str5;
        config.partitionKey = str6;
        config.parallelism = i;
        config.schemaFile = str7;
        return config;
    }
}
