package org.apache.pinot.plugin.ingestion.batch.spark;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import org.apache.commons.configuration.MapConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.class */
public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Serializable {
    private static final Logger LOGGER = LoggerFactory.getLogger(SparkSegmentGenerationJobRunner.class);
    private static final String DEPS_JAR_DIR = "dependencyJarDir";
    private static final String STAGING_DIR = "stagingDir";
    private SegmentGenerationJobSpec _spec;

    public SparkSegmentGenerationJobRunner() {
    }

    public SparkSegmentGenerationJobRunner(SegmentGenerationJobSpec segmentGenerationJobSpec) {
        init(segmentGenerationJobSpec);
    }

    public void init(SegmentGenerationJobSpec segmentGenerationJobSpec) {
        this._spec = segmentGenerationJobSpec;
        if (this._spec.getInputDirURI() == null) {
            throw new RuntimeException("Missing property 'inputDirURI' in 'jobSpec' file");
        }
        if (this._spec.getOutputDirURI() == null) {
            throw new RuntimeException("Missing property 'outputDirURI' in 'jobSpec' file");
        }
        if (this._spec.getRecordReaderSpec() == null) {
            throw new RuntimeException("Missing property 'recordReaderSpec' in 'jobSpec' file");
        }
        if (this._spec.getTableSpec() == null) {
            throw new RuntimeException("Missing property 'tableSpec' in 'jobSpec' file");
        }
        if (this._spec.getTableSpec().getTableName() == null) {
            throw new RuntimeException("Missing property 'tableName' in 'tableSpec'");
        }
        if (this._spec.getTableSpec().getSchemaURI() == null) {
            if (this._spec.getPinotClusterSpecs() == null || this._spec.getPinotClusterSpecs().length == 0) {
                throw new RuntimeException("Missing property 'schemaURI' in 'tableSpec'");
            }
            this._spec.getTableSpec().setSchemaURI(SegmentGenerationUtils.generateSchemaURI(this._spec.getPinotClusterSpecs()[0].getControllerURI(), this._spec.getTableSpec().getTableName()));
        }
        if (this._spec.getTableSpec().getTableConfigURI() == null) {
            if (this._spec.getPinotClusterSpecs() == null || this._spec.getPinotClusterSpecs().length == 0) {
                throw new RuntimeException("Missing property 'tableConfigURI' in 'tableSpec'");
            }
            this._spec.getTableSpec().setTableConfigURI(SegmentGenerationUtils.generateTableConfigURI(this._spec.getPinotClusterSpecs()[0].getControllerURI(), this._spec.getTableSpec().getTableName()));
        }
        if (this._spec.getExecutionFrameworkSpec().getExtraConfigs() == null) {
            this._spec.getExecutionFrameworkSpec().setExtraConfigs(new HashMap());
        }
    }

    /* JADX WARN: Finally extract failed */
    public void run() throws Exception {
        for (PinotFSSpec pinotFSSpec : this._spec.getPinotFSSpecs()) {
            PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new MapConfiguration(pinotFSSpec.getConfigs()));
        }
        URI uri = new URI(this._spec.getInputDirURI());
        if (uri.getScheme() == null) {
            uri = new File(this._spec.getInputDirURI()).toURI();
        }
        PinotFS create = PinotFSFactory.create(uri.getScheme());
        URI uri2 = new URI(this._spec.getOutputDirURI());
        if (uri2.getScheme() == null) {
            uri2 = new File(this._spec.getOutputDirURI()).toURI();
        }
        PinotFS create2 = PinotFSFactory.create(uri2.getScheme());
        create2.mkdir(uri2);
        String str = (String) this._spec.getExecutionFrameworkSpec().getExtraConfigs().get(STAGING_DIR);
        URI uri3 = null;
        if (str != null) {
            uri3 = URI.create(str);
            if (uri3.getScheme() == null) {
                uri3 = new File(str).toURI();
            }
            if (!uri2.getScheme().equals(uri3.getScheme())) {
                throw new RuntimeException(String.format("The scheme of staging directory URI [%s] and output directory URI [%s] has to be same.", uri3, uri2));
            }
            create2.mkdir(uri3);
        }
        String[] listFiles = create.listFiles(uri, true);
        ArrayList arrayList = new ArrayList();
        PathMatcher pathMatcher = this._spec.getIncludeFileNamePattern() != null ? FileSystems.getDefault().getPathMatcher(this._spec.getIncludeFileNamePattern()) : null;
        PathMatcher pathMatcher2 = this._spec.getExcludeFileNamePattern() != null ? FileSystems.getDefault().getPathMatcher(this._spec.getExcludeFileNamePattern()) : null;
        for (String str2 : listFiles) {
            if ((pathMatcher == null || pathMatcher.matches(Paths.get(str2, new String[0]))) && ((pathMatcher2 == null || !pathMatcher2.matches(Paths.get(str2, new String[0]))) && !create.isDirectory(new URI(str2)))) {
                arrayList.add(str2);
            }
        }
        try {
            JavaSparkContext fromSparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
            packPluginsToDistributedCache(fromSparkContext);
            if (this._spec.getExecutionFrameworkSpec().getExtraConfigs().containsKey(DEPS_JAR_DIR)) {
                addDepsJarToDistributedCache(fromSparkContext, (String) this._spec.getExecutionFrameworkSpec().getExtraConfigs().get(DEPS_JAR_DIR));
            }
            ArrayList arrayList2 = new ArrayList();
            for (int i = 0; i < arrayList.size(); i++) {
                arrayList2.add(String.format("%s %d", arrayList.get(i), Integer.valueOf(i)));
            }
            JavaRDD parallelize = fromSparkContext.parallelize(arrayList2, arrayList2.size());
            String str3 = fromSparkContext.getConf().contains("plugins.include") ? fromSparkContext.getConf().get("plugins.include") : null;
            URI uri4 = uri;
            URI uri5 = uri3 == null ? uri2 : uri3;
            parallelize.foreach(str4 -> {
                for (PinotFSSpec pinotFSSpec2 : this._spec.getPinotFSSpecs()) {
                    PinotFSFactory.register(pinotFSSpec2.getScheme(), pinotFSSpec2.getClassName(), new MapConfiguration(pinotFSSpec2.getConfigs()));
                }
                PinotFS create3 = PinotFSFactory.create(uri5.getScheme());
                String[] split = str4.split(" ");
                String str4 = split[0];
                int intValue = Integer.valueOf(split[1]).intValue();
                File file = new File("pinot-plugins.tar.gz");
                if (file.exists()) {
                    File file2 = new File("pinot-plugins-dir-" + intValue);
                    try {
                        TarGzCompressionUtils.unTar(file, file2);
                        LOGGER.info("Trying to set System Property: [{}={}]", "plugins.dir", file2.getAbsolutePath());
                        System.setProperty("plugins.dir", file2.getAbsolutePath());
                        if (str3 != null) {
                            LOGGER.info("Trying to set System Property: [{}={}]", "plugins.include", str3);
                            System.setProperty("plugins.include", str3);
                        }
                        LOGGER.info("Pinot plugins System Properties are set at [{}], plugins includes [{}]", System.getProperty("plugins.dir"), System.getProperty("plugins.include"));
                    } catch (Exception e) {
                        LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", file, e);
                        throw new RuntimeException(e);
                    }
                } else {
                    LOGGER.warn("Cannot find local Pinot plugins tar file at [{}]", file.getAbsolutePath());
                }
                URI create4 = URI.create(str4);
                if (create4.getScheme() == null) {
                    create4 = new URI(uri4.getScheme(), create4.getSchemeSpecificPart(), create4.getFragment());
                }
                File file3 = new File(FileUtils.getTempDirectory(), "pinot-" + System.currentTimeMillis());
                File file4 = new File(file3, "input");
                FileUtils.forceMkdir(file4);
                File file5 = new File(file3, "output");
                FileUtils.forceMkdir(file5);
                File file6 = new File(file4, SegmentGenerationUtils.getFileName(create4));
                LOGGER.info("Trying to copy input file from {} to {}", create4, file6);
                PinotFSFactory.create(create4.getScheme()).copyToLocalFile(create4, file6);
                SegmentGenerationTaskSpec segmentGenerationTaskSpec = new SegmentGenerationTaskSpec();
                segmentGenerationTaskSpec.setInputFilePath(file6.getAbsolutePath());
                segmentGenerationTaskSpec.setOutputDirectoryPath(file5.getAbsolutePath());
                segmentGenerationTaskSpec.setRecordReaderSpec(this._spec.getRecordReaderSpec());
                segmentGenerationTaskSpec.setSchema(SegmentGenerationUtils.getSchema(this._spec.getTableSpec().getSchemaURI()));
                segmentGenerationTaskSpec.setTableConfig(SegmentGenerationUtils.getTableConfig(this._spec.getTableSpec().getTableConfigURI()).toJsonNode());
                segmentGenerationTaskSpec.setSequenceId(intValue);
                segmentGenerationTaskSpec.setSegmentNameGeneratorSpec(this._spec.getSegmentNameGeneratorSpec());
                String run = new SegmentGenerationTaskRunner(segmentGenerationTaskSpec).run();
                File file7 = new File(file5, run);
                String str5 = run + ".tar.gz";
                File file8 = new File(file5, str5);
                LOGGER.info("Tarring segment from: {} to: {}", file7, file8);
                TarGzCompressionUtils.createTarGzOfDirectory(file7.getPath(), file8.getPath());
                LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", new Object[]{run, DataSizeUtils.fromBytes(FileUtils.sizeOf(file7)), DataSizeUtils.fromBytes(FileUtils.sizeOf(file8))});
                URI resolve = SegmentGenerationUtils.getRelativeOutputPath(uri4, create4, uri5).resolve(str5);
                LOGGER.info("Trying to move segment tar file from: [{}] to [{}]", file8, resolve);
                if (this._spec.isOverwriteOutput() || !PinotFSFactory.create(resolve.getScheme()).exists(resolve)) {
                    create3.copyFromLocalFile(file8, resolve);
                } else {
                    LOGGER.warn("Not overwrite existing output segment tar file: {}", Boolean.valueOf(create3.exists(resolve)));
                }
                FileUtils.deleteQuietly(file7);
                FileUtils.deleteQuietly(file8);
                FileUtils.deleteQuietly(file6);
            });
            if (uri3 != null) {
                LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", uri3, uri2);
                create2.copy(uri3, uri2);
            }
            if (uri3 != null) {
                LOGGER.info("Trying to clean up staging directory: [{}]", uri3);
                create2.delete(uri3, true);
            }
        } catch (Throwable th) {
            if (uri3 != null) {
                LOGGER.info("Trying to clean up staging directory: [{}]", uri3);
                create2.delete(uri3, true);
            }
            throw th;
        }
    }

    protected void addDepsJarToDistributedCache(JavaSparkContext javaSparkContext, String str) throws IOException {
        if (str != null) {
            URI create = URI.create(str);
            if (create.getScheme() == null) {
                create = new File(str).toURI();
            }
            PinotFS create2 = PinotFSFactory.create(create.getScheme());
            for (String str2 : create2.listFiles(create, true)) {
                if (!create2.isDirectory(URI.create(str2)) && str2.endsWith(".jar")) {
                    LOGGER.info("Adding deps jar: {} to distributed cache", str2);
                    javaSparkContext.addJar(str2);
                }
            }
        }
    }

    protected void packPluginsToDistributedCache(JavaSparkContext javaSparkContext) {
        String pluginsRootDir = PluginManager.get().getPluginsRootDir();
        if (pluginsRootDir == null) {
            LOGGER.warn("Local Pinot plugins directory is null, skip packaging...");
            return;
        }
        if (!new File(pluginsRootDir).exists()) {
            LOGGER.warn("Cannot find local Pinot plugins directory at [{}]", pluginsRootDir);
            return;
        }
        File file = new File("pinot-plugins.tar.gz");
        try {
            TarGzCompressionUtils.createTarGzOfDirectory(pluginsRootDir, file.getPath());
        } catch (IOException e) {
            LOGGER.error("Failed to tar plugins directory", e);
        }
        javaSparkContext.addFile(file.getAbsolutePath());
        String property = System.getProperty("plugins.include");
        if (property != null) {
            javaSparkContext.getConf().set("plugins.include", property);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1134530516:
                if (implMethodName.equals("lambda$run$569f05c9$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/VoidFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner") && serializedLambda.getImplMethodSignature().equals("(Ljava/net/URI;Ljava/lang/String;Ljava/net/URI;Ljava/lang/String;)V")) {
                    SparkSegmentGenerationJobRunner sparkSegmentGenerationJobRunner = (SparkSegmentGenerationJobRunner) serializedLambda.getCapturedArg(0);
                    URI uri = (URI) serializedLambda.getCapturedArg(1);
                    String str = (String) serializedLambda.getCapturedArg(2);
                    URI uri2 = (URI) serializedLambda.getCapturedArg(3);
                    return str4 -> {
                        for (PinotFSSpec pinotFSSpec2 : this._spec.getPinotFSSpecs()) {
                            PinotFSFactory.register(pinotFSSpec2.getScheme(), pinotFSSpec2.getClassName(), new MapConfiguration(pinotFSSpec2.getConfigs()));
                        }
                        PinotFS create3 = PinotFSFactory.create(uri.getScheme());
                        String[] split = str4.split(" ");
                        String str4 = split[0];
                        int intValue = Integer.valueOf(split[1]).intValue();
                        File file = new File("pinot-plugins.tar.gz");
                        if (file.exists()) {
                            File file2 = new File("pinot-plugins-dir-" + intValue);
                            try {
                                TarGzCompressionUtils.unTar(file, file2);
                                LOGGER.info("Trying to set System Property: [{}={}]", "plugins.dir", file2.getAbsolutePath());
                                System.setProperty("plugins.dir", file2.getAbsolutePath());
                                if (str != null) {
                                    LOGGER.info("Trying to set System Property: [{}={}]", "plugins.include", str);
                                    System.setProperty("plugins.include", str);
                                }
                                LOGGER.info("Pinot plugins System Properties are set at [{}], plugins includes [{}]", System.getProperty("plugins.dir"), System.getProperty("plugins.include"));
                            } catch (Exception e) {
                                LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", file, e);
                                throw new RuntimeException(e);
                            }
                        } else {
                            LOGGER.warn("Cannot find local Pinot plugins tar file at [{}]", file.getAbsolutePath());
                        }
                        URI create4 = URI.create(str4);
                        if (create4.getScheme() == null) {
                            create4 = new URI(uri2.getScheme(), create4.getSchemeSpecificPart(), create4.getFragment());
                        }
                        File file3 = new File(FileUtils.getTempDirectory(), "pinot-" + System.currentTimeMillis());
                        File file4 = new File(file3, "input");
                        FileUtils.forceMkdir(file4);
                        File file5 = new File(file3, "output");
                        FileUtils.forceMkdir(file5);
                        File file6 = new File(file4, SegmentGenerationUtils.getFileName(create4));
                        LOGGER.info("Trying to copy input file from {} to {}", create4, file6);
                        PinotFSFactory.create(create4.getScheme()).copyToLocalFile(create4, file6);
                        SegmentGenerationTaskSpec segmentGenerationTaskSpec = new SegmentGenerationTaskSpec();
                        segmentGenerationTaskSpec.setInputFilePath(file6.getAbsolutePath());
                        segmentGenerationTaskSpec.setOutputDirectoryPath(file5.getAbsolutePath());
                        segmentGenerationTaskSpec.setRecordReaderSpec(this._spec.getRecordReaderSpec());
                        segmentGenerationTaskSpec.setSchema(SegmentGenerationUtils.getSchema(this._spec.getTableSpec().getSchemaURI()));
                        segmentGenerationTaskSpec.setTableConfig(SegmentGenerationUtils.getTableConfig(this._spec.getTableSpec().getTableConfigURI()).toJsonNode());
                        segmentGenerationTaskSpec.setSequenceId(intValue);
                        segmentGenerationTaskSpec.setSegmentNameGeneratorSpec(this._spec.getSegmentNameGeneratorSpec());
                        String run = new SegmentGenerationTaskRunner(segmentGenerationTaskSpec).run();
                        File file7 = new File(file5, run);
                        String str5 = run + ".tar.gz";
                        File file8 = new File(file5, str5);
                        LOGGER.info("Tarring segment from: {} to: {}", file7, file8);
                        TarGzCompressionUtils.createTarGzOfDirectory(file7.getPath(), file8.getPath());
                        LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", new Object[]{run, DataSizeUtils.fromBytes(FileUtils.sizeOf(file7)), DataSizeUtils.fromBytes(FileUtils.sizeOf(file8))});
                        URI resolve = SegmentGenerationUtils.getRelativeOutputPath(uri2, create4, uri).resolve(str5);
                        LOGGER.info("Trying to move segment tar file from: [{}] to [{}]", file8, resolve);
                        if (this._spec.isOverwriteOutput() || !PinotFSFactory.create(resolve.getScheme()).exists(resolve)) {
                            create3.copyFromLocalFile(file8, resolve);
                        } else {
                            LOGGER.warn("Not overwrite existing output segment tar file: {}", Boolean.valueOf(create3.exists(resolve)));
                        }
                        FileUtils.deleteQuietly(file7);
                        FileUtils.deleteQuietly(file8);
                        FileUtils.deleteQuietly(file6);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
