package org.apache.hudi.utilities.testutils.sources;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.collection.RocksDBBasedMap;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.config.SourceTestConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.AvroSource;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.class */
public abstract class AbstractBaseTestSource extends AvroSource {
    public static final int DEFAULT_PARTITION_NUM = 0;
    private static final Logger LOG = LoggerFactory.getLogger(AbstractBaseTestSource.class);
    public static transient Map<Integer, HoodieTestDataGenerator> dataGeneratorMap = new HashMap();

    public static void initDataGen() {
        dataGeneratorMap.putIfAbsent(0, new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS));
    }

    public static void initDataGen(TypedProperties typedProperties, int i) {
        try {
            boolean z = typedProperties.getBoolean(SourceTestConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS.key(), ((Boolean) SourceTestConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS.defaultValue()).booleanValue());
            String str = typedProperties.getString(SourceTestConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS.key(), File.createTempFile("test_data_gen", ".keys").getParent()) + "/" + i;
            LOG.info("useRocksForTestDataGenKeys=" + z + ", BaseStoreDir=" + str);
            dataGeneratorMap.put(Integer.valueOf(i), new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, z ? new RocksDBBasedMap(str) : new HashMap()));
        } catch (IOException e) {
            throw new HoodieIOException(e.getMessage(), e);
        }
    }

    public static void initDataGen(SQLContext sQLContext, String str, int i) {
        List collectAsList = sQLContext.read().format("hudi").load(str).select("_hoodie_record_key", new String[]{"_hoodie_partition_path"}).collectAsList();
        dataGeneratorMap.put(Integer.valueOf(i), new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, (Map) IntStream.range(0, collectAsList.size()).boxed().collect(Collectors.toMap(Function.identity(), num -> {
            Row row = (Row) collectAsList.get(num.intValue());
            HoodieTestDataGenerator.KeyPartition keyPartition = new HoodieTestDataGenerator.KeyPartition();
            keyPartition.key = new HoodieKey(row.getString(0), row.getString(1));
            keyPartition.partitionPath = row.getString(1);
            return keyPartition;
        }))));
    }

    public static void resetDataGen() {
        Iterator<HoodieTestDataGenerator> it = dataGeneratorMap.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        dataGeneratorMap.clear();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractBaseTestSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        super(typedProperties, javaSparkContext, sparkSession, schemaProvider);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Stream<GenericRecord> fetchNextBatch(TypedProperties typedProperties, int i, String str, int i2) {
        Stream map;
        int integer = typedProperties.getInteger(SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), ((Integer) SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.defaultValue()).intValue());
        HoodieTestDataGenerator hoodieTestDataGenerator = dataGeneratorMap.get(Integer.valueOf(i2));
        int numExistingKeys = hoodieTestDataGenerator.getNumExistingKeys("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}");
        LOG.info("NumExistingKeys=" + numExistingKeys);
        int min = Math.min(numExistingKeys, i / 2);
        int i3 = i - min;
        LOG.info("Before adjustments => numInserts=" + i3 + ", numUpdates=" + min);
        boolean z = false;
        if (i3 + numExistingKeys > integer) {
            i3 = Math.max(0, integer - numExistingKeys);
            z = true;
        }
        if (i3 + min < i) {
            min = Math.min(numExistingKeys, i - i3);
        }
        Stream empty = Stream.empty();
        LOG.info("Before DataGen. Memory Usage=" + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) + ", Total Memory=" + Runtime.getRuntime().totalMemory() + ", Free Memory=" + Runtime.getRuntime().freeMemory());
        if (z || min < 50) {
            LOG.info("After adjustments => NumInserts=" + i3 + ", NumUpdates=" + min + ", maxUniqueRecords=" + integer);
            map = hoodieTestDataGenerator.generateUniqueUpdatesStream(str, Integer.valueOf(min), "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").map(AbstractBaseTestSource::toGenericRecord);
        } else {
            LOG.info("After adjustments => NumInserts=" + i3 + ", NumUpdates=" + (min - 50) + ", NumDeletes=50, maxUniqueRecords=" + integer);
            empty = hoodieTestDataGenerator.generateUniqueDeleteRecordStream(str, 50).map(AbstractBaseTestSource::toGenericRecord);
            map = hoodieTestDataGenerator.generateUniqueUpdatesStream(str, Integer.valueOf(min - 50), "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").map(AbstractBaseTestSource::toGenericRecord);
        }
        Stream<GenericRecord> map2 = hoodieTestDataGenerator.generateInsertsStream(str, Integer.valueOf(i3), false, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").map(AbstractBaseTestSource::toGenericRecord);
        return Boolean.valueOf(typedProperties.getOrDefault("hoodie.test.source.generate.inserts", "false").toString()).booleanValue() ? map2 : Stream.concat(empty, Stream.concat(map, map2));
    }

    private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) {
        try {
            return ((RawTripTestPayload) hoodieRecord.getData()).getRecordToInsert(HoodieTestDataGenerator.AVRO_SCHEMA);
        } catch (IOException e) {
            return null;
        }
    }
}
