package org.apache.pinot.minion.executor;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.helix.ZNRecord;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
import org.apache.pinot.core.segment.processing.filter.RecordFilterFactory;
import org.apache.pinot.core.segment.processing.framework.SegmentConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
import org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
import org.apache.pinot.minion.executor.SegmentConversionResult;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.com.google.common.base.Preconditions;
import shaded.com.google.common.collect.Lists;

/* loaded from: input_file:org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.class */
public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) RealtimeToOfflineSegmentsTaskExecutor.class);
    private static final String INPUT_SEGMENTS_DIR = "input_segments";
    private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
    private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
    private int _expectedVersion = Integer.MIN_VALUE;
    private long _nextWatermark;

    public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager minionTaskZkMetadataManager) {
        this._minionTaskZkMetadataManager = minionTaskZkMetadataManager;
    }

    @Override // org.apache.pinot.minion.executor.BaseMultipleSegmentsConversionExecutor
    public void preProcess(PinotTaskConfig pinotTaskConfig) {
        Map<String, String> configs = pinotTaskConfig.getConfigs();
        String str = configs.get("tableName");
        ZNRecord realtimeToOfflineSegmentsTaskZNRecord = this._minionTaskZkMetadataManager.getRealtimeToOfflineSegmentsTaskZNRecord(str);
        Preconditions.checkState(realtimeToOfflineSegmentsTaskZNRecord != null, "RealtimeToOfflineSegmentsTaskMetadata ZNRecord for table: %s should not be null. Exiting task.", str);
        RealtimeToOfflineSegmentsTaskMetadata fromZNRecord = RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(realtimeToOfflineSegmentsTaskZNRecord);
        long parseLong = Long.parseLong(configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY));
        Preconditions.checkState(fromZNRecord.getWatermarkMs() == parseLong, "watermarkMs in RealtimeToOfflineSegmentsTask metadata: %s does not match windowStartMs: %d in task configs for table: %s. ZNode may have been modified by another task", fromZNRecord, Long.valueOf(parseLong), str);
        this._expectedVersion = realtimeToOfflineSegmentsTaskZNRecord.getVersion();
    }

    @Override // org.apache.pinot.minion.executor.BaseMultipleSegmentsConversionExecutor
    protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> list, File file) throws Exception {
        String taskType = pinotTaskConfig.getTaskType();
        Map<String, String> configs = pinotTaskConfig.getConfigs();
        LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
        long currentTimeMillis = System.currentTimeMillis();
        String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(TableNameBuilder.extractRawTableName(configs.get("tableName")));
        TableConfig tableConfig = getTableConfig(tableNameWithType);
        Schema schema = getSchema(tableNameWithType);
        Set<String> physicalColumnNames = schema.getPhysicalColumnNames();
        String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
        DateTimeFieldSpec specForTimeColumn = schema.getSpecForTimeColumn(timeColumnName);
        Preconditions.checkState(specForTimeColumn != null, "No valid spec found for time column: %s in schema for table: %s", timeColumnName, tableNameWithType);
        long parseLong = Long.parseLong(configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY));
        long parseLong2 = Long.parseLong(configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY));
        this._nextWatermark = parseLong2;
        String str = configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY);
        String str2 = configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY);
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, String> entry : configs.entrySet()) {
            String key = entry.getKey();
            if (key.endsWith(MinionConstants.RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)) {
                hashMap.put(key.split(MinionConstants.RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)[0], entry.getValue());
            }
        }
        String str3 = configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY);
        SegmentProcessorConfig.Builder schema2 = new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
        if (str != null) {
            schema2.setRecordTransformerConfig(getRecordTransformerConfigForTime(str, timeColumnName));
        }
        schema2.setRecordFilterConfig(getRecordFilterConfigForWindow(parseLong, parseLong2, specForTimeColumn, timeColumnName));
        if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
            schema2.setPartitionerConfigs(Lists.newArrayList(getPartitionerConfig(tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap(), tableNameWithType, physicalColumnNames)));
        }
        schema2.setCollectorConfig(getCollectorConfig(str2, hashMap, physicalColumnNames, tableConfig.getIndexingConfig().getSortedColumn()));
        if (str3 != null) {
            schema2.setSegmentConfig(getSegmentConfig(str3));
        }
        SegmentProcessorConfig build = schema2.build();
        File file2 = new File(file, INPUT_SEGMENTS_DIR);
        Preconditions.checkState(file2.mkdirs(), "Failed to create input directory: %s for task: %s", file2.getAbsolutePath(), taskType);
        Iterator<File> it2 = list.iterator();
        while (it2.hasNext()) {
            FileUtils.copyDirectoryToDirectory(it2.next(), file2);
        }
        File file3 = new File(file, OUTPUT_SEGMENTS_DIR);
        Preconditions.checkState(file3.mkdirs(), "Failed to create output directory: %s for task: %s", file3.getAbsolutePath(), taskType);
        SegmentProcessorFramework segmentProcessorFramework = new SegmentProcessorFramework(file2, build, file3);
        try {
            segmentProcessorFramework.processSegments();
            segmentProcessorFramework.cleanup();
            LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            ArrayList arrayList = new ArrayList();
            for (File file4 : file3.listFiles()) {
                arrayList.add(new SegmentConversionResult.Builder().setFile(file4).setSegmentName(file4.getName()).setTableNameWithType(tableNameWithType).build());
            }
            return arrayList;
        } catch (Throwable th) {
            segmentProcessorFramework.cleanup();
            throw th;
        }
    }

    @Override // org.apache.pinot.minion.executor.BaseMultipleSegmentsConversionExecutor
    public void postProcess(PinotTaskConfig pinotTaskConfig) {
        this._minionTaskZkMetadataManager.setRealtimeToOfflineSegmentsTaskMetadata(new RealtimeToOfflineSegmentsTaskMetadata(pinotTaskConfig.getConfigs().get("tableName"), this._nextWatermark), this._expectedVersion);
    }

    private RecordTransformerConfig getRecordTransformerConfigForTime(String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put(str2, str);
        return new RecordTransformerConfig.Builder().setTransformFunctionsMap(hashMap).build();
    }

    private RecordFilterConfig getRecordFilterConfigForWindow(long j, long j2, DateTimeFieldSpec dateTimeFieldSpec, String str) {
        String filterFunctionLong;
        DateTimeFormatSpec dateTimeFormatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
        TimeUnit columnUnit = dateTimeFormatSpec.getColumnUnit();
        DateTimeFieldSpec.TimeFormat timeFormat = dateTimeFormatSpec.getTimeFormat();
        if (columnUnit.equals(TimeUnit.MILLISECONDS) && timeFormat.equals(DateTimeFieldSpec.TimeFormat.EPOCH)) {
            filterFunctionLong = getFilterFunctionLong(j, j2, str);
        } else {
            String fromMillisToFormat = dateTimeFormatSpec.fromMillisToFormat(j);
            String fromMillisToFormat2 = dateTimeFormatSpec.fromMillisToFormat(j2);
            filterFunctionLong = dateTimeFieldSpec.getDataType().isNumeric() ? getFilterFunctionLong(Long.parseLong(fromMillisToFormat), Long.parseLong(fromMillisToFormat2), str) : getFilterFunctionString(fromMillisToFormat, fromMillisToFormat2, str);
        }
        return new RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION).setFilterFunction(filterFunctionLong).build();
    }

    private PartitionerConfig getPartitionerConfig(Map<String, ColumnPartitionConfig> map, String str, Set<String> set) {
        Preconditions.checkState(map.size() == 1, "Cannot partition using more than 1 ColumnPartitionConfig for table: %s", str);
        String next = map.keySet().iterator().next();
        Preconditions.checkState(set.contains(next), "Partition column: %s is not a physical column in the schema", next);
        return new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.TABLE_PARTITION_CONFIG).setColumnName(next).setColumnPartitionConfig(map.get(next)).build();
    }

    private CollectorConfig getCollectorConfig(String str, Map<String, String> map, Set<String> set, List<String> list) {
        CollectorFactory.CollectorType valueOf = str == null ? CollectorFactory.CollectorType.CONCAT : CollectorFactory.CollectorType.valueOf(str.toUpperCase());
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String key = entry.getKey();
            Preconditions.checkState(set.contains(key), "Aggregate column: %s is not a physical column in the schema", key);
            hashMap.put(key, ValueAggregatorFactory.ValueAggregatorType.valueOf(entry.getValue().toUpperCase()));
        }
        if (list != null) {
            for (String str2 : list) {
                Preconditions.checkState(set.contains(str2), "Sorted column: %s is not a physical column in the schema", str2);
            }
        }
        return new CollectorConfig.Builder().setCollectorType(valueOf).setAggregatorTypeMap(hashMap).setSortOrder(list).build();
    }

    private SegmentConfig getSegmentConfig(String str) {
        return new SegmentConfig.Builder().setMaxNumRecordsPerSegment(Integer.parseInt(str)).build();
    }

    private String getFilterFunctionLong(long j, long j2, String str) {
        return String.format("Groovy({%s < %d || %s >= %d}, %s)", str, Long.valueOf(j), str, Long.valueOf(j2), str);
    }

    private String getFilterFunctionString(String str, String str2, String str3) {
        return String.format("Groovy({%s < \"%s\" || %s >= \"%s\"}, %s)", str3, str, str3, str2, str3);
    }
}
