package org.apache.pinot.core.segment.processing.framework;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.pinot.core.segment.processing.collector.Collector;
import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/core/segment/processing/framework/SegmentReducer.class */
public class SegmentReducer {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SegmentReducer.class);
    private final File _reducerInputDir;
    private final File _reducerOutputDir;
    private final String _reducerId;
    private final Schema _pinotSchema;
    private final org.apache.avro.Schema _avroSchema;
    private final Collector _collector;
    private final int _numRecordsPerPart;

    public SegmentReducer(String str, File file, SegmentReducerConfig segmentReducerConfig, File file2) {
        this._reducerInputDir = file;
        this._reducerOutputDir = file2;
        this._reducerId = str;
        this._pinotSchema = segmentReducerConfig.getPinotSchema();
        this._avroSchema = SegmentProcessorUtils.convertPinotSchemaToAvroSchema(this._pinotSchema);
        this._collector = CollectorFactory.getCollector(segmentReducerConfig.getCollectorConfig(), this._pinotSchema);
        this._numRecordsPerPart = segmentReducerConfig.getNumRecordsPerPart();
        LOGGER.info("Initialized reducer with id: {}, input dir: {}, output dir: {}, collector: {}, numRecordsPerPart: {}", this._reducerId, this._reducerInputDir, this._reducerOutputDir, this._collector.getClass(), Integer.valueOf(this._numRecordsPerPart));
    }

    public void reduce() throws Exception {
        int i = 0;
        for (File file : this._reducerInputDir.listFiles()) {
            RecordReader recordReaderByClass = RecordReaderFactory.getRecordReaderByClass("org.apache.pinot.plugin.inputformat.avro.AvroRecordReader", file, this._pinotSchema.getColumnNames(), null);
            while (recordReaderByClass.hasNext()) {
                this._collector.collect(recordReaderByClass.next());
                if (this._collector.size() == this._numRecordsPerPart) {
                    int i2 = i;
                    i++;
                    flushRecords(this._collector, createReducerOutputFileName(this._reducerId, i2));
                    this._collector.reset();
                }
            }
        }
        if (this._collector.size() > 0) {
            flushRecords(this._collector, createReducerOutputFileName(this._reducerId, i));
            this._collector.reset();
        }
    }

    private void flushRecords(Collector collector, String str) throws IOException {
        GenericData.Record record = new GenericData.Record(this._avroSchema);
        Iterator<GenericRow> it2 = collector.iterator();
        DataFileWriter dataFileWriter = new DataFileWriter(new GenericDatumWriter(this._avroSchema));
        dataFileWriter.create(this._avroSchema, new File(this._reducerOutputDir, str));
        while (it2.hasNext()) {
            SegmentProcessorUtils.convertGenericRowToAvroRecord(it2.next(), record);
            dataFileWriter.append(record);
        }
        dataFileWriter.close();
    }

    public static String createReducerOutputFileName(String str, int i) {
        return "reducer_" + str + "_" + i + ".avro";
    }

    public void cleanup() {
    }
}
