package org.apache.pinot.plugin.ingestion.batch.hadoop;

import com.google.common.base.Preconditions;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.plugin.PluginManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

/* loaded from: input_file:org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.class */
public class HadoopSegmentGenerationJobRunner extends Configured implements IngestionJobRunner, Serializable {
    public static final String SEGMENT_GENERATION_JOB_SPEC = "segmentGenerationJobSpec";
    private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentGenerationJobRunner.class);
    private static final String DEPS_JAR_DIR = "dependencyJarDir";
    private static final String STAGING_DIR = "stagingDir";
    private static final String SEGMENT_TAR_DIR = "segmentTar";
    private SegmentGenerationJobSpec _spec;

    public HadoopSegmentGenerationJobRunner() {
        setConf(new Configuration());
        getConf().set("mapreduce.job.user.classpath.first", "true");
    }

    public HadoopSegmentGenerationJobRunner(SegmentGenerationJobSpec segmentGenerationJobSpec) {
        this();
        init(segmentGenerationJobSpec);
    }

    public void init(SegmentGenerationJobSpec segmentGenerationJobSpec) {
        this._spec = segmentGenerationJobSpec;
        if (this._spec.getInputDirURI() == null) {
            throw new RuntimeException("Missing property 'inputDirURI' in 'jobSpec' file");
        }
        if (this._spec.getOutputDirURI() == null) {
            throw new RuntimeException("Missing property 'outputDirURI' in 'jobSpec' file");
        }
        if (this._spec.getRecordReaderSpec() == null) {
            throw new RuntimeException("Missing property 'recordReaderSpec' in 'jobSpec' file");
        }
        if (this._spec.getTableSpec() == null) {
            throw new RuntimeException("Missing property 'tableSpec' in 'jobSpec' file");
        }
        if (this._spec.getTableSpec().getTableName() == null) {
            throw new RuntimeException("Missing property 'tableName' in 'tableSpec'");
        }
        if (this._spec.getTableSpec().getSchemaURI() == null) {
            if (this._spec.getPinotClusterSpecs() == null || this._spec.getPinotClusterSpecs().length == 0) {
                throw new RuntimeException("Missing property 'schemaURI' in 'tableSpec'");
            }
            this._spec.getTableSpec().setSchemaURI(SegmentGenerationUtils.generateSchemaURI(this._spec.getPinotClusterSpecs()[0].getControllerURI(), this._spec.getTableSpec().getTableName()));
        }
        if (this._spec.getTableSpec().getTableConfigURI() == null) {
            if (this._spec.getPinotClusterSpecs() == null || this._spec.getPinotClusterSpecs().length == 0) {
                throw new RuntimeException("Missing property 'tableConfigURI' in 'tableSpec'");
            }
            this._spec.getTableSpec().setTableConfigURI(SegmentGenerationUtils.generateTableConfigURI(this._spec.getPinotClusterSpecs()[0].getControllerURI(), this._spec.getTableSpec().getTableName()));
        }
        if (this._spec.getExecutionFrameworkSpec().getExtraConfigs() == null) {
            this._spec.getExecutionFrameworkSpec().setExtraConfigs(new HashMap());
        }
    }

    /* JADX WARN: Finally extract failed */
    public void run() throws Exception {
        for (PinotFSSpec pinotFSSpec : this._spec.getPinotFSSpecs()) {
            PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
        }
        URI uri = new URI(this._spec.getInputDirURI());
        if (uri.getScheme() == null) {
            uri = new File(this._spec.getInputDirURI()).toURI();
        }
        PinotFS create = PinotFSFactory.create(uri.getScheme());
        URI uri2 = new URI(this._spec.getOutputDirURI());
        if (uri2.getScheme() == null) {
            uri2 = new File(this._spec.getOutputDirURI()).toURI();
        }
        PinotFS create2 = PinotFSFactory.create(uri2.getScheme());
        create2.mkdir(uri2);
        String str = (String) this._spec.getExecutionFrameworkSpec().getExtraConfigs().get(STAGING_DIR);
        Preconditions.checkNotNull(str, "Please set config: stagingDir under 'executionFrameworkSpec.extraConfigs'");
        URI create3 = URI.create(str);
        if (create3.getScheme() == null) {
            create3 = new File(str).toURI();
        }
        if (!uri2.getScheme().equals(create3.getScheme())) {
            throw new RuntimeException(String.format("The scheme of staging directory URI [%s] and output directory URI [%s] has to be same.", create3, uri2));
        }
        create2.mkdir(create3);
        Path path = new Path(create3.toString(), "input");
        create2.mkdir(path.toUri());
        Path path2 = new Path(create3.toString(), SEGMENT_TAR_DIR);
        create2.mkdir(path2.toUri());
        String[] listFiles = create.listFiles(uri, true);
        ArrayList arrayList = new ArrayList();
        PathMatcher pathMatcher = this._spec.getIncludeFileNamePattern() != null ? FileSystems.getDefault().getPathMatcher(this._spec.getIncludeFileNamePattern()) : null;
        PathMatcher pathMatcher2 = this._spec.getExcludeFileNamePattern() != null ? FileSystems.getDefault().getPathMatcher(this._spec.getExcludeFileNamePattern()) : null;
        for (String str2 : listFiles) {
            if ((pathMatcher == null || pathMatcher.matches(Paths.get(str2, new String[0]))) && ((pathMatcher2 == null || !pathMatcher2.matches(Paths.get(str2, new String[0]))) && !create.isDirectory(new URI(str2)))) {
                arrayList.add(str2);
            }
        }
        int size = arrayList.size();
        if (size == 0) {
            String format = String.format("No data file founded in [%s], with include file pattern: [%s] and exclude file  pattern [%s]", this._spec.getInputDirURI(), this._spec.getIncludeFileNamePattern(), this._spec.getExcludeFileNamePattern());
            LOGGER.error(format);
            throw new RuntimeException(format);
        }
        LOGGER.info("Creating segments with data files: {}", arrayList);
        for (int i = 0; i < size; i++) {
            String str3 = (String) arrayList.get(i);
            File file = new File("tmp");
            DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(file));
            Throwable th = null;
            try {
                try {
                    dataOutputStream.write(StringUtil.encodeUtf8(str3 + " " + i));
                    dataOutputStream.flush();
                    create2.copyFromLocalFile(file, new Path(path, Integer.toString(i)).toUri());
                    if (dataOutputStream != null) {
                        if (0 != 0) {
                            try {
                                dataOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            dataOutputStream.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (dataOutputStream != null) {
                    if (th != null) {
                        try {
                            dataOutputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        dataOutputStream.close();
                    }
                }
                throw th3;
            }
        }
        try {
            Job job = Job.getInstance(getConf());
            job.setJarByClass(getClass());
            job.setJobName(getClass().getName());
            Configuration configuration = job.getConfiguration();
            String str4 = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
            if (str4 != null) {
                configuration.set("mapreduce.job.credentials.binary", str4);
            }
            int segmentCreationJobParallelism = this._spec.getSegmentCreationJobParallelism();
            if (segmentCreationJobParallelism <= 0 || segmentCreationJobParallelism > size) {
                segmentCreationJobParallelism = size;
            }
            configuration.setInt("mapreduce.job.maps", segmentCreationJobParallelism);
            packPluginsToDistributedCache(job);
            if (this._spec.getExecutionFrameworkSpec().getExtraConfigs().containsKey(DEPS_JAR_DIR)) {
                addDepsJarToDistributedCache(job, (String) this._spec.getExecutionFrameworkSpec().getExtraConfigs().get(DEPS_JAR_DIR));
            }
            this._spec.setOutputDirURI(path2.toUri().toString());
            configuration.set(SEGMENT_GENERATION_JOB_SPEC, new Yaml().dump(this._spec));
            this._spec.setOutputDirURI(uri2.toString());
            job.setMapperClass(getMapperClass());
            job.setNumReduceTasks(0);
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            job.setMapOutputKeyClass(LongWritable.class);
            job.setMapOutputValueClass(Text.class);
            FileInputFormat.addInputPath(job, path);
            FileOutputFormat.setOutputPath(job, new Path(str, "output"));
            job.waitForCompletion(true);
            if (!job.isSuccessful()) {
                throw new RuntimeException("Job failed: " + job);
            }
            LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", create3, uri2);
            create2.copy(new Path(str, SEGMENT_TAR_DIR).toUri(), uri2);
            LOGGER.info("Trying to clean up staging directory: [{}]", create3);
            create2.delete(create3, true);
        } catch (Throwable th5) {
            LOGGER.info("Trying to clean up staging directory: [{}]", create3);
            create2.delete(create3, true);
            throw th5;
        }
    }

    protected Class<? extends Mapper<LongWritable, Text, LongWritable, Text>> getMapperClass() {
        return HadoopSegmentCreationMapper.class;
    }

    protected void packPluginsToDistributedCache(Job job) {
        File file = new File(PluginManager.get().getPluginsRootDir());
        if (!file.exists()) {
            LOGGER.warn("Cannot find local Pinot plugins directory at [{}]", file);
            return;
        }
        File file2 = new File("pinot-plugins.tar.gz");
        try {
            TarGzCompressionUtils.createTarGzFile(file, file2);
            job.addCacheArchive(file2.toURI());
            String property = System.getProperty("plugins.include");
            if (property != null) {
                job.getConfiguration().set("plugins.include", property);
            }
        } catch (IOException e) {
            LOGGER.error("Failed to tar plugins directory", e);
            throw new RuntimeException(e);
        }
    }

    protected void addDepsJarToDistributedCache(Job job, String str) throws IOException {
        if (str != null) {
            URI create = URI.create(str);
            if (create.getScheme() == null) {
                create = new File(str).toURI();
            }
            PinotFS create2 = PinotFSFactory.create(create.getScheme());
            for (String str2 : create2.listFiles(create, true)) {
                URI create3 = URI.create(str2);
                if (!create2.isDirectory(create3) && str2.endsWith(".jar")) {
                    LOGGER.info("Adding deps jar: {} to distributed cache", str2);
                    job.addCacheArchive(create3);
                }
            }
        }
    }
}
