package org.apache.pinot.plugin.ingestion.batch.hadoop;

import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils;
import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

/* loaded from: input_file:org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.class */
public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
    protected static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentCreationMapper.class);
    protected static final String PROGRESS_REPORTER_THREAD_NAME = "pinot-hadoop-progress-reporter";
    protected static final long PROGRESS_REPORTER_JOIN_WAIT_TIME_MS = 5000;
    protected Configuration _jobConf;
    protected SegmentGenerationJobSpec _spec;
    private File _localTempDir;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper$ProgressReporter.class */
    public static class ProgressReporter implements Runnable {
        private static final Logger LOGGER = LoggerFactory.getLogger(ProgressReporter.class);
        private static final long PROGRESS_REPORTER_INTERVAL_MS = 60000;
        private final Mapper<LongWritable, Text, LongWritable, Text>.Context _context;

        ProgressReporter(Mapper<LongWritable, Text, LongWritable, Text>.Context context) {
            this._context = context;
        }

        @Override // java.lang.Runnable
        public void run() {
            LOGGER.info("Starting progress reporter thread: {}", Thread.currentThread());
            while (true) {
                try {
                    Thread.sleep(PROGRESS_REPORTER_INTERVAL_MS);
                    LOGGER.info("============== Reporting progress ==============");
                    this._context.progress();
                } catch (InterruptedException e) {
                    LOGGER.info("Progress reporter thread: {} interrupted", Thread.currentThread());
                    return;
                }
            }
        }
    }

    public void setup(Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException {
        this._jobConf = context.getConfiguration();
        Yaml yaml = new Yaml();
        String str = this._jobConf.get(HadoopSegmentGenerationJobRunner.SEGMENT_GENERATION_JOB_SPEC);
        this._spec = (SegmentGenerationJobSpec) yaml.loadAs(str, SegmentGenerationJobSpec.class);
        LOGGER.info("Segment generation job spec : {}", str);
        this._localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID());
        File file = new File("pinot-plugins.tar.gz");
        if (file.exists()) {
            File file2 = Files.createTempDirectory("pinot-plugins-dir", new FileAttribute[0]).toFile();
            try {
                TarGzCompressionUtils.untar(file, file2);
                LOGGER.info("Trying to set System Property: {}={}", "plugins.dir", file2.getAbsolutePath());
                System.setProperty("plugins.dir", file2.getAbsolutePath());
                String str2 = this._jobConf.get("plugins.include");
                if (str2 != null) {
                    LOGGER.info("Trying to set System Property: {}={}", "plugins.include", str2);
                    System.setProperty("plugins.include", str2);
                }
                LOGGER.info("Pinot plugins System Properties are set at [{}], plugins includes [{}]", System.getProperty("plugins.dir"), System.getProperty("plugins.include"));
            } catch (Exception e) {
                LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", file, e);
                throw new RuntimeException(e);
            }
        } else {
            LOGGER.warn("Cannot find local Pinot plugins directory at [{}]", file.getAbsolutePath());
        }
        for (PinotFSSpec pinotFSSpec : this._spec.getPinotFSSpecs()) {
            PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
        }
    }

    protected void map(LongWritable longWritable, Text text, Mapper<LongWritable, Text, LongWritable, Text>.Context context) {
        try {
            try {
                String[] split = StringUtils.split(text.toString(), ' ');
                Preconditions.checkState(split.length == 2, "Illegal input value: {}", text);
                String str = split[0];
                int intValue = Integer.valueOf(split[1]).intValue();
                LOGGER.info("Generating segment with input file: {}, sequence id: {}", str, Integer.valueOf(intValue));
                URI uri = new URI(this._spec.getInputDirURI());
                if (uri.getScheme() == null) {
                    uri = new File(this._spec.getInputDirURI()).toURI();
                }
                URI uri2 = new URI(this._spec.getOutputDirURI());
                if (uri2.getScheme() == null) {
                    uri2 = new File(this._spec.getOutputDirURI()).toURI();
                }
                PinotFS create = PinotFSFactory.create(uri2.getScheme());
                URI create2 = URI.create(str);
                if (create2.getScheme() == null) {
                    create2 = new URI(uri.getScheme(), create2.getSchemeSpecificPart(), create2.getFragment());
                }
                File file = new File(this._localTempDir, "input");
                FileUtils.forceMkdir(file);
                File file2 = new File(this._localTempDir, "output");
                FileUtils.forceMkdir(file2);
                File file3 = new File(file, SegmentGenerationUtils.getFileName(create2));
                PinotFSFactory.create(create2.getScheme()).copyToLocalFile(create2, file3);
                SegmentGenerationTaskSpec segmentGenerationTaskSpec = new SegmentGenerationTaskSpec();
                segmentGenerationTaskSpec.setInputFilePath(file3.getAbsolutePath());
                segmentGenerationTaskSpec.setOutputDirectoryPath(file2.getAbsolutePath());
                segmentGenerationTaskSpec.setRecordReaderSpec(this._spec.getRecordReaderSpec());
                segmentGenerationTaskSpec.setSchema(SegmentGenerationUtils.getSchema(this._spec.getTableSpec().getSchemaURI(), this._spec.getAuthToken()));
                segmentGenerationTaskSpec.setTableConfig(SegmentGenerationUtils.getTableConfig(this._spec.getTableSpec().getTableConfigURI(), this._spec.getAuthToken()));
                segmentGenerationTaskSpec.setSequenceId(intValue);
                segmentGenerationTaskSpec.setSegmentNameGeneratorSpec(this._spec.getSegmentNameGeneratorSpec());
                segmentGenerationTaskSpec.setFailOnEmptySegment(this._spec.isFailOnEmptySegment());
                segmentGenerationTaskSpec.setCustomProperty("input.data.file.uri", create2.toString());
                Thread thread = new Thread(getProgressReporter(context));
                thread.setName(PROGRESS_REPORTER_THREAD_NAME);
                thread.start();
                try {
                    try {
                        String run = new SegmentGenerationTaskRunner(segmentGenerationTaskSpec).run();
                        thread.interrupt();
                        thread.join(PROGRESS_REPORTER_JOIN_WAIT_TIME_MS);
                        if (thread.isAlive()) {
                            LOGGER.error("Failed to interrupt progress reporter thread: {}", thread);
                        }
                        File file4 = new File(file2, run);
                        String encode = URLEncoder.encode(run + ".tar.gz", "UTF-8");
                        File file5 = new File(file2, encode);
                        LOGGER.info("Tarring segment from: {} to: {}", file4, file5);
                        TarGzCompressionUtils.createTarGzFile(file4, file5);
                        LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", new Object[]{run, DataSizeUtils.fromBytes(FileUtils.sizeOf(file4)), DataSizeUtils.fromBytes(FileUtils.sizeOf(file5))});
                        URI relativeOutputPath = SegmentGenerationUtils.getRelativeOutputPath(uri, create2, uri2);
                        SegmentGenerationJobUtils.moveLocalTarFileToRemote(file5, relativeOutputPath.resolve(encode), this._spec.isOverwriteOutput());
                        String encode2 = URLEncoder.encode(run + ".metadata.tar.gz", "UTF-8");
                        URI resolve = relativeOutputPath.resolve(encode2);
                        if (create.exists(resolve) && (this._spec.isOverwriteOutput() || !this._spec.isCreateMetadataTarGz())) {
                            LOGGER.info("Deleting existing metadata tar gz file: {}", resolve);
                            create.delete(resolve, true);
                        }
                        if (segmentGenerationTaskSpec.isCreateMetadataTarGz()) {
                            File file6 = new File(file2, encode2);
                            SegmentGenerationJobUtils.createSegmentMetadataTarGz(file4, file6);
                            SegmentGenerationJobUtils.moveLocalTarFileToRemote(file6, resolve, this._spec.isOverwriteOutput());
                        }
                        FileUtils.deleteQuietly(file4);
                        FileUtils.deleteQuietly(file3);
                        context.write(new LongWritable(intValue), new Text(encode));
                        LOGGER.info("Finish generating segment: {} with input file: {}, sequence id: {}", new Object[]{run, create2, Integer.valueOf(intValue)});
                        FileUtils.deleteQuietly(this._localTempDir);
                    } catch (Throwable th) {
                        thread.interrupt();
                        thread.join(PROGRESS_REPORTER_JOIN_WAIT_TIME_MS);
                        if (thread.isAlive()) {
                            LOGGER.error("Failed to interrupt progress reporter thread: {}", thread);
                        }
                        throw th;
                    }
                } catch (Exception e) {
                    LOGGER.error("Caught exception while creating segment with input file: {}, sequence id: {}", new Object[]{str, Integer.valueOf(intValue), e});
                    throw new RuntimeException(e);
                }
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        } catch (Throwable th2) {
            FileUtils.deleteQuietly(this._localTempDir);
            throw th2;
        }
    }

    protected Runnable getProgressReporter(Mapper<LongWritable, Text, LongWritable, Text>.Context context) {
        return new ProgressReporter(context);
    }

    public void cleanup(Mapper<LongWritable, Text, LongWritable, Text>.Context context) {
        LOGGER.info("Deleting local temporary directory: {}", this._localTempDir);
        FileUtils.deleteQuietly(this._localTempDir);
    }

    protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
        map((LongWritable) obj, (Text) obj2, (Mapper<LongWritable, Text, LongWritable, Text>.Context) context);
    }
}
