package org.apache.pinot.hadoop.io;

import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/hadoop/io/PinotRecordWriter.class */
public class PinotRecordWriter<K, V> extends RecordWriter<K, V> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PinotRecordWriter.class);
    private final SegmentGeneratorConfig _segmentConfig;
    private final Path _workDir;
    private final String _baseDataDir;
    private PinotRecordSerialization _pinotRecordSerialization;
    private final FileHandler _handler;
    private long MAX_FILE_SIZE = 64000000;

    public PinotRecordWriter(SegmentGeneratorConfig segmentGeneratorConfig, TaskAttemptContext taskAttemptContext, Path path, PinotRecordSerialization pinotRecordSerialization) {
        this._segmentConfig = segmentGeneratorConfig;
        this._workDir = path;
        this._baseDataDir = PinotOutputFormat.getTempSegmentDir(taskAttemptContext) + "/data";
        try {
            this._handler = new FileHandler(this._baseDataDir, PinotOutputFormat.getTableName(taskAttemptContext), ".json", this.MAX_FILE_SIZE);
            this._handler.open(true);
            this._pinotRecordSerialization = pinotRecordSerialization;
            this._pinotRecordSerialization.init(taskAttemptContext.getConfiguration(), segmentGeneratorConfig.getSchema());
        } catch (Exception e) {
            throw new RuntimeException("Error initialize PinotRecordReader", e);
        }
    }

    public void write(K k, V v) throws IOException, InterruptedException {
        this._handler.write(this._pinotRecordSerialization.serialize(v).toBytes());
    }

    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        this._pinotRecordSerialization.close();
        this._handler.close();
        File file = new File(this._baseDataDir);
        this._segmentConfig.setSegmentName(PinotOutputFormat.getSegmentName(taskAttemptContext));
        SegmentIndexCreationDriverImpl segmentIndexCreationDriverImpl = new SegmentIndexCreationDriverImpl();
        for (File file2 : file.listFiles()) {
            createSegment(file2.getPath(), segmentIndexCreationDriverImpl);
        }
        FileSystem fileSystem = FileSystem.get(new Configuration());
        String absolutePath = new File(this._segmentConfig.getOutDir(), this._segmentConfig.getSegmentName()).getAbsolutePath();
        String localTarFile = getLocalTarFile(PinotOutputFormat.getTempSegmentDir(taskAttemptContext));
        LOGGER.info("Trying to tar the segment to: {}", localTarFile);
        TarGzCompressionUtils.createTarGzOfDirectory(absolutePath, localTarFile);
        String str = this._workDir + "/segmentTar/" + this._segmentConfig.getSegmentName() + ".tar.gz";
        LOGGER.info("*********************************************************************");
        LOGGER.info("Copy from : {} to {}", localTarFile, str);
        LOGGER.info("*********************************************************************");
        fileSystem.copyFromLocalFile(true, true, new Path(localTarFile), new Path(str));
        clean(PinotOutputFormat.getTempSegmentDir(taskAttemptContext));
    }

    private void createSegment(String str, SegmentIndexCreationDriver segmentIndexCreationDriver) {
        try {
            this._segmentConfig.setInputFilePath(str);
            segmentIndexCreationDriver.init(this._segmentConfig);
            segmentIndexCreationDriver.build();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private String getLocalTarFile(String str) {
        String str2 = str + "/segmentTar";
        File file = new File(str2);
        if (!file.exists()) {
            file.mkdirs();
        }
        return str2 + "/" + this._segmentConfig.getSegmentName() + ".tar.gz";
    }

    private void clean(String str) {
        File file = new File(str);
        if (file.exists()) {
            file.delete();
        }
    }
}
