package org.apache.pinot.hadoop.job;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.hadoop.job.mappers.SegmentCreationMapper;
import org.apache.pinot.hadoop.utils.PinotHadoopJobPreparationHelper;
import org.apache.pinot.ingestion.common.JobConfigConstants;
import org.apache.pinot.ingestion.jobs.SegmentCreationJob;
import org.apache.pinot.ingestion.utils.JobPreparationHelper;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import shaded.com.google.common.base.Preconditions;

/* loaded from: input_file:org/apache/pinot/hadoop/job/HadoopSegmentCreationJob.class */
public class HadoopSegmentCreationJob extends SegmentCreationJob {
    protected FileSystem _outputDirFileSystem;

    public HadoopSegmentCreationJob(Properties properties) {
        super(properties);
        getConf().set("mapreduce.job.user.classpath.first", "true");
    }

    @Override // org.apache.pinot.ingestion.jobs.SegmentCreationJob
    public void run() throws Exception {
        this._logger.info("Starting {}", getClass().getSimpleName());
        this._outputDirFileSystem = FileSystem.get(new Path(this._outputDir).toUri(), getConf());
        JobPreparationHelper.mkdirs(this._outputDirFileSystem, new Path(this._outputDir), this._defaultPermissionsMask);
        JobPreparationHelper.mkdirs(this._outputDirFileSystem, new Path(this._stagingDir), this._defaultPermissionsMask);
        Path path = new Path(this._stagingDir, "input");
        JobPreparationHelper.mkdirs(this._outputDirFileSystem, path, this._defaultPermissionsMask);
        List<Path> dataFilePaths = getDataFilePaths(this._inputPattern);
        int size = dataFilePaths.size();
        if (size == 0) {
            String str = "No data file founded with pattern: " + this._inputPattern;
            this._logger.error(str);
            throw new RuntimeException(str);
        }
        this._logger.info("Creating segments with data files: {}", dataFilePaths);
        for (int i = 0; i < size; i++) {
            Path path2 = dataFilePaths.get(i);
            FSDataOutputStream create = this._outputDirFileSystem.create(new Path(path, Integer.toString(i)));
            Throwable th = null;
            try {
                try {
                    create.write(StringUtil.encodeUtf8(path2.toString() + " " + i));
                    create.flush();
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (create != null) {
                    if (th != null) {
                        try {
                            create.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th3;
            }
        }
        Job job = Job.getInstance(getConf());
        job.setJarByClass(getClass());
        job.setJobName(getClass().getName());
        Configuration configuration = job.getConfiguration();
        String str2 = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
        if (str2 != null) {
            configuration.set("mapreduce.job.credentials.binary", str2);
        }
        configuration.setInt("mapreduce.job.maps", size);
        TableConfig tableConfig = getTableConfig();
        if (tableConfig != null) {
            validateTableConfig(tableConfig);
            configuration.set(JobConfigConstants.TABLE_CONFIG, tableConfig.toJsonString());
        }
        configuration.set(JobConfigConstants.SCHEMA, getSchema().toSingleLineJsonString());
        for (Map.Entry entry : this._properties.entrySet()) {
            configuration.set(entry.getKey().toString(), entry.getValue().toString());
        }
        job.setMapperClass(getMapperClass());
        job.setNumReduceTasks(0);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, path);
        FileOutputFormat.setOutputPath(job, new Path(this._stagingDir, "output"));
        addDepsJarToDistributedCache(job);
        addAdditionalJobProperties(job);
        job.waitForCompletion(true);
        if (!job.isSuccessful()) {
            throw new RuntimeException("Job failed: " + job);
        }
        moveSegmentsToOutputDir();
        this._logger.info("Deleting the staging directory: {}", this._stagingDir);
        this._outputDirFileSystem.delete(new Path(this._stagingDir), true);
    }

    protected void validateTableConfig(TableConfig tableConfig) {
        SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
        if ("APPEND".equalsIgnoreCase(validationConfig.getSegmentPushType())) {
            Preconditions.checkState((validationConfig.getTimeColumnName() == null || validationConfig.getTimeType() == null) ? false : true, "For APPEND use case, time column and type must be set");
        }
    }

    protected Class<? extends Mapper<LongWritable, Text, LongWritable, Text>> getMapperClass() {
        return SegmentCreationMapper.class;
    }

    protected void addDepsJarToDistributedCache(Job job) throws IOException {
        if (this._depsJarDir != null) {
            PinotHadoopJobPreparationHelper.addDepsJarToDistributedCacheHelper(FileSystem.get(new Path(this._depsJarDir).toUri(), getConf()), job, new Path(this._depsJarDir));
        }
    }

    protected void addAdditionalJobProperties(Job job) {
    }

    protected void moveSegmentsToOutputDir() throws IOException {
        movePath(this._outputDirFileSystem, new Path(new Path(this._stagingDir, "output"), JobConfigConstants.SEGMENT_TAR_DIR).toString(), this._outputDir, true);
    }
}
