package org.apache.pinot.spark.jobs;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.ingestion.common.JobConfigConstants;
import org.apache.pinot.ingestion.jobs.SegmentCreationJob;
import org.apache.pinot.ingestion.utils.JobPreparationHelper;
import org.apache.pinot.spark.utils.PinotSparkJobPreparationHelper;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.com.google.common.base.Preconditions;

/* loaded from: input_file:org/apache/pinot/spark/jobs/SparkSegmentCreationJob.class */
public class SparkSegmentCreationJob extends SegmentCreationJob {
    private static final Logger _logger = LoggerFactory.getLogger((Class<?>) SparkSegmentCreationJob.class);

    public SparkSegmentCreationJob(Properties properties) {
        super(properties);
    }

    protected static void addAdditionalSegmentGeneratorConfigs(SegmentGeneratorConfig segmentGeneratorConfig, Path path, int i) {
    }

    @Override // org.apache.pinot.ingestion.jobs.SegmentCreationJob
    public void run() throws Exception {
        _logger.info("Starting {}", getClass().getSimpleName());
        Path path = new Path(this._inputPattern);
        Path path2 = new Path(this._stagingDir);
        Path path3 = new Path(this._stagingDir);
        FileSystem fileSystem = FileSystem.get(path2.toUri(), new Configuration());
        JobPreparationHelper.mkdirs(fileSystem, path2, this._defaultPermissionsMask);
        JobPreparationHelper.mkdirs(fileSystem, path3, this._defaultPermissionsMask);
        Path path4 = new Path(path3, "input");
        JobPreparationHelper.mkdirs(fileSystem, path4, this._defaultPermissionsMask);
        List<Path> dataFilePaths = getDataFilePaths(path);
        int size = dataFilePaths.size();
        retainRecentFiles(dataFilePaths, this._lookBackPeriod);
        int size2 = dataFilePaths.size();
        if (size2 == 0) {
            if (size > 0) {
                _logger.info("No input files within {} days to be processed.", Integer.valueOf(this._lookBackPeriod));
                return;
            } else {
                String str = "No data file founded with pattern: " + path;
                _logger.error(str);
                throw new RuntimeException(str);
            }
        }
        _logger.info("Creating segments with data files: {}", dataFilePaths);
        for (int i = 0; i < size2; i++) {
            Path path5 = dataFilePaths.get(i);
            FSDataOutputStream create = fileSystem.create(new Path(path4, Integer.toString(i)));
            Throwable th = null;
            try {
                try {
                    create.write(StringUtil.encodeUtf8(path5.toString() + " " + i));
                    create.flush();
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (create != null) {
                    if (th != null) {
                        try {
                            create.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th3;
            }
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Path> it2 = dataFilePaths.iterator();
        while (it2.hasNext()) {
            arrayList.add(it2.next().toString());
        }
        TableConfig tableConfig = getTableConfig();
        if (tableConfig != null) {
            validateTableConfig(tableConfig);
            this._properties.put(JobConfigConstants.TABLE_CONFIG, tableConfig.toJsonConfigString());
        }
        this._properties.put(JobConfigConstants.SCHEMA, getSchema().toSingleLineJsonString());
        JavaSparkContext fromSparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
        addDepsJarToDistributedCache(fromSparkContext);
        JavaRDD parallelize = fromSparkContext.parallelize(arrayList, size2);
        if (this._localDirectorySequenceId) {
            HashMap hashMap = new HashMap();
            for (Path path6 : dataFilePaths) {
                Path parent = path6.getParent();
                if (!hashMap.containsKey(parent.toString())) {
                    hashMap.put(parent.toString(), new ArrayList());
                }
                ((List) hashMap.get(parent.toString())).add(path6.toString());
            }
            parallelize.foreach(str2 -> {
                SparkSegmentCreationFunction sparkSegmentCreationFunction = new SparkSegmentCreationFunction(this._properties, new Path(this._stagingDir, "output").toString());
                sparkSegmentCreationFunction.run(str2, getLocalDirIndex(hashMap, str2));
                sparkSegmentCreationFunction.cleanup();
            });
        } else {
            parallelize.zipWithIndex().foreach(tuple2 -> {
                SparkSegmentCreationFunction sparkSegmentCreationFunction = new SparkSegmentCreationFunction(this._properties, new Path(this._stagingDir, "output").toString());
                sparkSegmentCreationFunction.run((String) tuple2._1, (Long) tuple2._2);
                sparkSegmentCreationFunction.cleanup();
            });
        }
        moveSegmentsToOutputDir(fileSystem, this._stagingDir, this._outputDir);
        _logger.info("Deleting the staging directory: {}", path3);
        fileSystem.delete(path3, true);
    }

    private Long getLocalDirIndex(Map<String, List<String>> map, String str) {
        return new Long(map.get(new Path(str).getParent().toString()).indexOf(str));
    }

    protected void validateTableConfig(TableConfig tableConfig) {
        SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
        if ("APPEND".equalsIgnoreCase(validationConfig.getSegmentPushType())) {
            Preconditions.checkState((validationConfig.getTimeColumnName() == null || validationConfig.getTimeType() == null) ? false : true, "For APPEND use case, time column and type must be set");
        }
    }

    protected void addDepsJarToDistributedCache(JavaSparkContext javaSparkContext) throws IOException {
        if (this._depsJarDir != null) {
            Path path = new Path(this._depsJarDir);
            PinotSparkJobPreparationHelper.addDepsJarToDistributedCacheHelper(FileSystem.get(path.toUri(), new Configuration()), javaSparkContext, path);
        }
    }

    protected void moveSegmentsToOutputDir(FileSystem fileSystem, String str, String str2) throws IOException {
        Path path = new Path(new Path(str, "output"), JobConfigConstants.SEGMENT_TAR_DIR);
        _logger.info("Moving all segment tar files from: {} to: {}", str, str2);
        movePath(fileSystem, path.toString(), str2, true);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 626169948:
                if (implMethodName.equals("lambda$run$e3b46054$1")) {
                    z = false;
                    break;
                }
                break;
            case 1136757502:
                if (implMethodName.equals("lambda$run$17d73dc4$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/VoidFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/apache/pinot/spark/jobs/SparkSegmentCreationJob") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)V")) {
                    SparkSegmentCreationJob sparkSegmentCreationJob = (SparkSegmentCreationJob) serializedLambda.getCapturedArg(0);
                    return tuple2 -> {
                        SparkSegmentCreationFunction sparkSegmentCreationFunction = new SparkSegmentCreationFunction(this._properties, new Path(this._stagingDir, "output").toString());
                        sparkSegmentCreationFunction.run((String) tuple2._1, (Long) tuple2._2);
                        sparkSegmentCreationFunction.cleanup();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/VoidFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/apache/pinot/spark/jobs/SparkSegmentCreationJob") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map;Ljava/lang/String;)V")) {
                    SparkSegmentCreationJob sparkSegmentCreationJob2 = (SparkSegmentCreationJob) serializedLambda.getCapturedArg(0);
                    Map map = (Map) serializedLambda.getCapturedArg(1);
                    return str2 -> {
                        SparkSegmentCreationFunction sparkSegmentCreationFunction = new SparkSegmentCreationFunction(this._properties, new Path(this._stagingDir, "output").toString());
                        sparkSegmentCreationFunction.run(str2, getLocalDirIndex(map, str2));
                        sparkSegmentCreationFunction.cleanup();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
