package org.apache.pinot.server.starter.helix;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.segment.SegmentMetadata;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.data.manager.TableDataManager;
import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.core.segment.index.loader.LoaderUtils;
import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.com.google.common.base.Preconditions;

@ThreadSafe
/* loaded from: input_file:org/apache/pinot/server/starter/helix/HelixInstanceDataManager.class */
public class HelixInstanceDataManager implements InstanceDataManager {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) HelixInstanceDataManager.class);
    private final ConcurrentHashMap<String, TableDataManager> _tableDataManagerMap = new ConcurrentHashMap<>();
    private HelixInstanceDataManagerConfig _instanceDataManagerConfig;
    private String _instanceId;
    private HelixManager _helixManager;
    private ServerMetrics _serverMetrics;
    private ZkHelixPropertyStore<ZNRecord> _propertyStore;

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public synchronized void init(Configuration configuration, HelixManager helixManager, ServerMetrics serverMetrics) throws ConfigurationException {
        LOGGER.info("Initializing Helix instance data manager");
        this._instanceDataManagerConfig = new HelixInstanceDataManagerConfig(configuration);
        LOGGER.info("HelixInstanceDataManagerConfig: {}", this._instanceDataManagerConfig);
        this._instanceId = this._instanceDataManagerConfig.getInstanceId();
        this._helixManager = helixManager;
        this._serverMetrics = serverMetrics;
        File file = new File(this._instanceDataManagerConfig.getInstanceDataDir());
        if (!file.exists()) {
            Preconditions.checkState(file.mkdirs());
        }
        File file2 = new File(this._instanceDataManagerConfig.getInstanceSegmentTarDir());
        if (!file2.exists()) {
            Preconditions.checkState(file2.mkdirs());
        }
        TableDataManagerProvider.init(this._instanceDataManagerConfig);
        LOGGER.info("Initialized Helix instance data manager");
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public synchronized void start() {
        this._propertyStore = this._helixManager.getHelixPropertyStore();
        LOGGER.info("Helix instance data manager started");
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public synchronized void shutDown() {
        Iterator<TableDataManager> it2 = this._tableDataManagerMap.values().iterator();
        while (it2.hasNext()) {
            it2.next().shutDown();
        }
        LOGGER.info("Helix instance data manager shut down");
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public void addOfflineSegment(String str, String str2, File file) throws Exception {
        LOGGER.info("Adding segment: {} to table: {}", str2, str);
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, str);
        Preconditions.checkNotNull(tableConfig);
        this._tableDataManagerMap.computeIfAbsent(str, str3 -> {
            return createTableDataManager(str3, tableConfig);
        }).addSegment(file, new IndexLoadingConfig(this._instanceDataManagerConfig, tableConfig));
        LOGGER.info("Added segment: {} to table: {}", str2, str);
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public void addRealtimeSegment(String str, String str2) throws Exception {
        LOGGER.info("Adding segment: {} to table: {}", str2, str);
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, str);
        Preconditions.checkNotNull(tableConfig);
        this._tableDataManagerMap.computeIfAbsent(str, str3 -> {
            return createTableDataManager(str3, tableConfig);
        }).addSegment(str2, tableConfig, new IndexLoadingConfig(this._instanceDataManagerConfig, tableConfig));
        LOGGER.info("Added segment: {} to table: {}", str2, str);
    }

    private TableDataManager createTableDataManager(String str, TableConfig tableConfig) {
        LOGGER.info("Creating table data manager for table: {}", str);
        TableDataManagerConfig defaultHelixTableDataManagerConfig = TableDataManagerConfig.getDefaultHelixTableDataManagerConfig(this._instanceDataManagerConfig, str);
        defaultHelixTableDataManagerConfig.overrideConfigs(tableConfig);
        TableDataManager tableDataManager = TableDataManagerProvider.getTableDataManager(defaultHelixTableDataManagerConfig, this._instanceId, this._propertyStore, this._serverMetrics);
        tableDataManager.start();
        LOGGER.info("Created table data manager for table: {}", str);
        return tableDataManager;
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public void removeSegment(String str, String str2) {
        LOGGER.info("Removing segment: {} from table: {}", str2, str);
        TableDataManager tableDataManager = this._tableDataManagerMap.get(str);
        if (tableDataManager != null) {
            tableDataManager.removeSegment(str2);
            LOGGER.info("Removed segment: {} from table: {}", str2, str);
        }
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public void reloadSegment(String str, String str2) throws Exception {
        LOGGER.info("Reloading single segment: {} in table: {}", str2, str);
        SegmentMetadata segmentMetadata = getSegmentMetadata(str, str2);
        if (segmentMetadata == null) {
            return;
        }
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, str);
        Preconditions.checkNotNull(tableConfig);
        reloadSegment(str, segmentMetadata, tableConfig, ZKMetadataProvider.getTableSchema(this._propertyStore, str));
        LOGGER.info("Reloaded single segment: {} in table: {}", str2, str);
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public void reloadAllSegments(String str) throws Exception {
        LOGGER.info("Reloading all segments in table: {}", str);
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, str);
        Preconditions.checkNotNull(tableConfig);
        Schema tableSchema = ZKMetadataProvider.getTableSchema(this._propertyStore, str);
        Iterator<SegmentMetadata> it2 = getAllSegmentsMetadata(str).iterator();
        while (it2.hasNext()) {
            reloadSegment(str, it2.next(), tableConfig, tableSchema);
        }
        LOGGER.info("Reloaded all segments in table: {}", str);
    }

    private void reloadSegment(String str, SegmentMetadata segmentMetadata, TableConfig tableConfig, @Nullable Schema schema) throws Exception {
        String name = segmentMetadata.getName();
        LOGGER.info("Reloading segment: {} in table: {}", name, str);
        TableDataManager tableDataManager = this._tableDataManagerMap.get(str);
        if (tableDataManager == null) {
            LOGGER.warn("Failed to find table data manager for table: {}, skipping reloading segment", str);
            return;
        }
        File indexDir = segmentMetadata.getIndexDir();
        if (indexDir == null) {
            if (!this._instanceDataManagerConfig.shouldReloadConsumingSegment()) {
                LOGGER.info("Skip reloading REALTIME consuming segment: {} in table: {}", name, str);
                return;
            }
            Preconditions.checkState(schema != null, "Failed to find schema for table: {}", str);
            LOGGER.info("Try reloading REALTIME consuming segment: {} in table: {}", name, str);
            SegmentDataManager acquireSegment = tableDataManager.acquireSegment(name);
            if (acquireSegment != null) {
                try {
                    ((MutableSegmentImpl) acquireSegment.getSegment()).addExtraColumns(schema);
                    tableDataManager.releaseSegment(acquireSegment);
                    return;
                } catch (Throwable th) {
                    tableDataManager.releaseSegment(acquireSegment);
                    throw th;
                }
            }
            return;
        }
        Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is not a directory", indexDir);
        File parentFile = indexDir.getParentFile();
        File file = new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
        Lock segmentLock = SegmentLocks.getSegmentLock(str, name);
        try {
            try {
                segmentLock.lock();
                Preconditions.checkState(indexDir.renameTo(file), "Failed to rename index directory: %s to segment backup directory: %s", indexDir, file);
                FileUtils.copyDirectory(file, indexDir);
                tableDataManager.addSegment(ImmutableSegmentLoader.load(indexDir, new IndexLoadingConfig(this._instanceDataManagerConfig, tableConfig), schema));
                File file2 = new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
                Preconditions.checkState(file.renameTo(file2), "Failed to rename segment backup directory: %s to segment temporary directory: %s", file, file2);
                LOGGER.info("Reloaded segment: {} in table: {}", name, str);
                FileUtils.deleteDirectory(file2);
                segmentLock.unlock();
            } catch (Throwable th2) {
                segmentLock.unlock();
                throw th2;
            }
        } catch (Exception e) {
            try {
                LoaderUtils.reloadFailureRecovery(indexDir);
            } catch (Exception e2) {
                LOGGER.error("Failed to recover after reload failure", (Throwable) e2);
                e.addSuppressed(e2);
            }
            throw e;
        }
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public Set<String> getAllTables() {
        return this._tableDataManagerMap.keySet();
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    @Nullable
    public TableDataManager getTableDataManager(String str) {
        return this._tableDataManagerMap.get(str);
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    @Nullable
    public SegmentMetadata getSegmentMetadata(String str, String str2) {
        SegmentDataManager acquireSegment;
        TableDataManager tableDataManager = this._tableDataManagerMap.get(str);
        if (tableDataManager == null || (acquireSegment = tableDataManager.acquireSegment(str2)) == null) {
            return null;
        }
        try {
            SegmentMetadata segmentMetadata = acquireSegment.getSegment().getSegmentMetadata();
            tableDataManager.releaseSegment(acquireSegment);
            return segmentMetadata;
        } catch (Throwable th) {
            tableDataManager.releaseSegment(acquireSegment);
            throw th;
        }
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public List<SegmentMetadata> getAllSegmentsMetadata(String str) {
        TableDataManager tableDataManager = this._tableDataManagerMap.get(str);
        if (tableDataManager == null) {
            return Collections.emptyList();
        }
        List<SegmentDataManager> acquireAllSegments = tableDataManager.acquireAllSegments();
        try {
            ArrayList arrayList = new ArrayList(acquireAllSegments.size());
            Iterator<SegmentDataManager> it2 = acquireAllSegments.iterator();
            while (it2.hasNext()) {
                arrayList.add(it2.next().getSegment().getSegmentMetadata());
            }
            return arrayList;
        } finally {
            Iterator<SegmentDataManager> it3 = acquireAllSegments.iterator();
            while (it3.hasNext()) {
                tableDataManager.releaseSegment(it3.next());
            }
        }
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public String getSegmentDataDirectory() {
        return this._instanceDataManagerConfig.getInstanceDataDir();
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public String getSegmentFileDirectory() {
        return this._instanceDataManagerConfig.getInstanceSegmentTarDir();
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public int getMaxParallelRefreshThreads() {
        return this._instanceDataManagerConfig.getMaxParallelRefreshThreads();
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public ZkHelixPropertyStore<ZNRecord> getPropertyStore() {
        return this._propertyStore;
    }
}
