package org.apache.pinot.core.data.manager.realtime;

import com.google.common.base.Preconditions;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.config.IndexingConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.data.manager.BaseTableDataManager;
import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.core.segment.index.loader.LoaderUtils;
import org.apache.pinot.core.segment.virtualcolumn.VirtualColumnProviderFactory;
import org.apache.pinot.spi.data.Schema;

@ThreadSafe
/* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.class */
public class RealtimeTableDataManager extends BaseTableDataManager {
    private SegmentBuildTimeLeaseExtender _leaseExtender;
    private RealtimeSegmentStatsHistory _statsHistory;
    private final Semaphore _segmentBuildSemaphore;
    private static final String STATS_FILE_NAME = "segment-stats.ser";
    private static final String CONSUMERS_DIR = "consumers";
    private static final int MIN_INTERVAL_BETWEEN_STATS_UPDATES_MINUTES = 30;
    private final ExecutorService _segmentAsyncExecutorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("SegmentAsyncExecutorService"));
    private final Map<Integer, Semaphore> _partitionIdToSemaphoreMap = new ConcurrentHashMap();

    public RealtimeTableDataManager(Semaphore semaphore) {
        this._segmentBuildSemaphore = semaphore;
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager
    protected void doInit() {
        this._leaseExtender = SegmentBuildTimeLeaseExtender.create(this._instanceId, this._serverMetrics, this._tableNameWithType);
        File file = new File(this._tableDataDir, STATS_FILE_NAME);
        try {
            this._statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(file);
        } catch (IOException | ClassNotFoundException e) {
            this._logger.error("Error reading history object for table {} from {}", new Object[]{this._tableNameWithType, file.getAbsolutePath(), e});
            File file2 = new File(this._tableDataDir, "segment-stats.ser." + UUID.randomUUID());
            try {
                FileUtils.moveFile(file, file2);
                this._logger.warn("Saved unreadable {} into {}. Creating a fresh instance", file.getAbsolutePath(), file2.getAbsolutePath());
                try {
                    this._statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(file);
                } catch (Exception e2) {
                    Utils.rethrowException(e2);
                }
            } catch (IOException e3) {
                this._logger.error("Could not move {} to {}", new Object[]{file.getAbsolutePath(), file2.getAbsolutePath(), e3});
                throw new RuntimeException(e);
            }
        }
        this._statsHistory.setMinIntervalBetweenUpdatesMillis(TimeUnit.MILLISECONDS.convert(30L, TimeUnit.MINUTES));
        File file3 = new File(getConsumerDir());
        if (file3.exists()) {
            for (File file4 : file3.listFiles(new FilenameFilter() { // from class: org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.1
                @Override // java.io.FilenameFilter
                public boolean accept(File file5, String str) {
                    return !str.equals(RealtimeTableDataManager.STATS_FILE_NAME);
                }
            })) {
                if (file4.delete()) {
                    this._logger.info("Deleted old file {}", file4.getAbsolutePath());
                } else {
                    this._logger.error("Cannot delete file {}", file4.getAbsolutePath());
                }
            }
        }
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager
    protected void doStart() {
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager
    protected void doShutdown() {
        this._segmentAsyncExecutorService.shutdown();
        Iterator<SegmentDataManager> it = this._segmentDataManagerMap.values().iterator();
        while (it.hasNext()) {
            it.next().destroy();
        }
        if (this._leaseExtender != null) {
            this._leaseExtender.shutDown();
        }
    }

    public RealtimeSegmentStatsHistory getStatsHistory() {
        return this._statsHistory;
    }

    public Semaphore getSegmentBuildSemaphore() {
        return this._segmentBuildSemaphore;
    }

    public String getConsumerDir() {
        String consumerDir = this._tableDataManagerConfig.getConsumerDir();
        File file = consumerDir != null ? new File(consumerDir, this._tableNameWithType) : new File(this._tableDataDir + File.separator + CONSUMERS_DIR);
        if (!file.exists() && !file.mkdirs()) {
            this._logger.error("Failed to create consumer directory {}", file.getAbsolutePath());
        }
        return file.getAbsolutePath();
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager, org.apache.pinot.core.data.manager.TableDataManager
    public void addSegment(String str, TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig) throws Exception {
        RealtimeSegmentDataManager lLRealtimeSegmentDataManager;
        SegmentDataManager segmentDataManager = this._segmentDataManagerMap.get(str);
        if (segmentDataManager != null) {
            this._logger.warn("Skipping adding existing segment: {} for table: {} with data manager class: {}", new Object[]{str, this._tableNameWithType, segmentDataManager.getClass().getSimpleName()});
            return;
        }
        RealtimeSegmentZKMetadata realtimeSegmentZKMetadata = ZKMetadataProvider.getRealtimeSegmentZKMetadata(this._propertyStore, this._tableNameWithType, str);
        Preconditions.checkNotNull(realtimeSegmentZKMetadata);
        Schema tableSchema = ZKMetadataProvider.getTableSchema(this._propertyStore, this._tableNameWithType);
        Preconditions.checkNotNull(tableSchema);
        File file = new File(this._indexDir, str);
        LoaderUtils.reloadFailureRecovery(file);
        if (file.exists() && realtimeSegmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
            addSegment(ImmutableSegmentLoader.load(file, indexLoadingConfig, tableSchema));
            return;
        }
        if (!isValid(tableSchema, tableConfig.getIndexingConfig())) {
            this._logger.error("Not adding segment {}", str);
            throw new RuntimeException("Mismatching schema/table config for " + this._tableNameWithType);
        }
        VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(tableSchema, str);
        InstanceZKMetadata instanceZKMetadata = ZKMetadataProvider.getInstanceZKMetadata(this._propertyStore, this._instanceId);
        if (SegmentName.isHighLevelConsumerSegmentName(str)) {
            lLRealtimeSegmentDataManager = new HLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig, instanceZKMetadata, this, this._indexDir.getAbsolutePath(), indexLoadingConfig, tableSchema, this._serverMetrics);
        } else {
            LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata = (LLCRealtimeSegmentZKMetadata) realtimeSegmentZKMetadata;
            if (realtimeSegmentZKMetadata.getStatus().equals(CommonConstants.Segment.Realtime.Status.DONE)) {
                downloadAndReplaceSegment(str, lLCRealtimeSegmentZKMetadata, indexLoadingConfig);
                return;
            }
            LLCSegmentName lLCSegmentName = new LLCSegmentName(str);
            int partitionId = lLCSegmentName.getPartitionId();
            this._partitionIdToSemaphoreMap.putIfAbsent(Integer.valueOf(partitionId), new Semaphore(1));
            lLRealtimeSegmentDataManager = new LLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig, this, this._indexDir.getAbsolutePath(), indexLoadingConfig, tableSchema, lLCSegmentName, this._partitionIdToSemaphoreMap.get(Integer.valueOf(partitionId)), this._serverMetrics);
        }
        this._logger.info("Initialize RealtimeSegmentDataManager - " + str);
        this._segmentDataManagerMap.put(str, lLRealtimeSegmentDataManager);
    }

    public void downloadAndReplaceSegment(String str, LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata, IndexLoadingConfig indexLoadingConfig) {
        String downloadUrl = lLCRealtimeSegmentZKMetadata.getDownloadUrl();
        File file = new File(this._indexDir, "tmp-" + str + "." + System.currentTimeMillis());
        File file2 = new File(this._indexDir, str + ".tar.gz");
        File file3 = new File(this._indexDir, str);
        FileUtils.deleteQuietly(file3);
        try {
            try {
                SegmentFetcherFactory.fetchSegmentToLocal(downloadUrl, file2);
                this._logger.info("Downloaded file from {} to {}; Length of downloaded file: {}", new Object[]{downloadUrl, file2, Long.valueOf(file2.length())});
                TarGzCompressionUtils.unTar(file2, file);
                this._logger.info("Uncompressed file {} into tmp dir {}", file2, file);
                FileUtils.moveDirectory(file.listFiles()[0], file3);
                this._logger.info("Replacing LLC Segment {}", str);
                replaceLLSegment(str, indexLoadingConfig);
                FileUtils.deleteQuietly(file2);
                FileUtils.deleteQuietly(file);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            FileUtils.deleteQuietly(file2);
            FileUtils.deleteQuietly(file);
            throw th;
        }
    }

    public void replaceHLSegment(RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, IndexLoadingConfig indexLoadingConfig) throws Exception {
        ZKMetadataProvider.setRealtimeSegmentZKMetadata(this._propertyStore, this._tableNameWithType, realtimeSegmentZKMetadata);
        addSegment(ImmutableSegmentLoader.load(new File(this._indexDir, realtimeSegmentZKMetadata.getSegmentName()), indexLoadingConfig, ZKMetadataProvider.getTableSchema(this._propertyStore, this._tableNameWithType)));
    }

    public void replaceLLSegment(String str, IndexLoadingConfig indexLoadingConfig) {
        try {
            addSegment(ImmutableSegmentLoader.load(new File(this._indexDir, str), indexLoadingConfig, ZKMetadataProvider.getTableSchema(this._propertyStore, this._tableNameWithType)));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public String getServerInstance() {
        return this._instanceId;
    }

    private boolean isValid(Schema schema, IndexingConfig indexingConfig) {
        List sortedColumn = indexingConfig.getSortedColumn();
        boolean z = true;
        if (!sortedColumn.isEmpty()) {
            String str = (String) sortedColumn.get(0);
            if (sortedColumn.size() > 1) {
                this._logger.warn("More than one sorted column configured. Using {}", str);
            }
            if (!schema.getFieldSpecFor(str).isSingleValueField()) {
                this._logger.error("Cannot configure multi-valued column {} as sorted column", str);
                z = false;
            }
        }
        if (!schema.validate(this._logger)) {
            z = false;
        }
        return z;
    }
}
