package org.apache.pinot.server.starter.helix;

import java.util.HashSet;
import java.util.Set;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.class */
public class OffsetBasedConsumptionStatusChecker {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) OffsetBasedConsumptionStatusChecker.class);
    private final InstanceDataManager _instanceDataManager;
    private final Set<String> _consumingSegments;
    private final Set<String> _caughtUpSegments = new HashSet();

    public OffsetBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Set<String> set) {
        this._instanceDataManager = instanceDataManager;
        this._consumingSegments = set;
    }

    public int getNumConsumingSegmentsNotReachedTheirLatestOffset() {
        for (String str : this._consumingSegments) {
            if (!this._caughtUpSegments.contains(str)) {
                TableDataManager tableDataManager = getTableDataManager(str);
                if (tableDataManager == null) {
                    LOGGER.info("TableDataManager is not yet setup for segment {}. Will check consumption status later", str);
                } else {
                    SegmentDataManager segmentDataManager = null;
                    try {
                        segmentDataManager = tableDataManager.acquireSegment(str);
                        if (segmentDataManager == null) {
                            LOGGER.info("SegmentDataManager is not yet setup for segment {}. Will check consumption status later", str);
                            if (segmentDataManager != null) {
                                tableDataManager.releaseSegment(segmentDataManager);
                            }
                        } else if (segmentDataManager instanceof LLRealtimeSegmentDataManager) {
                            LLRealtimeSegmentDataManager lLRealtimeSegmentDataManager = (LLRealtimeSegmentDataManager) segmentDataManager;
                            StreamPartitionMsgOffset currentOffset = lLRealtimeSegmentDataManager.getCurrentOffset();
                            StreamPartitionMsgOffset latestStreamOffsetAtStartupTime = lLRealtimeSegmentDataManager.getLatestStreamOffsetAtStartupTime();
                            if (latestStreamOffsetAtStartupTime == null || currentOffset == null) {
                                LOGGER.info("Null offset found for segment {} - latest stream offset: {}, latest ingested offset: {}. Will check consumption status later", str, latestStreamOffsetAtStartupTime, currentOffset);
                                if (segmentDataManager != null) {
                                    tableDataManager.releaseSegment(segmentDataManager);
                                }
                            } else if (currentOffset.compareTo(latestStreamOffsetAtStartupTime) < 0) {
                                LOGGER.info("Latest ingested offset {} in segment {} is smaller than stream latest available offset {} ", currentOffset, str, latestStreamOffsetAtStartupTime);
                                if (segmentDataManager != null) {
                                    tableDataManager.releaseSegment(segmentDataManager);
                                }
                            } else {
                                LOGGER.info("Segment {} with latest ingested offset {} has caught up to the latest stream offset {}", str, currentOffset, latestStreamOffsetAtStartupTime);
                                this._caughtUpSegments.add(str);
                                if (segmentDataManager != null) {
                                    tableDataManager.releaseSegment(segmentDataManager);
                                }
                            }
                        } else {
                            LOGGER.info("Segment {} is already committed and is considered caught up.", str);
                            this._caughtUpSegments.add(str);
                            if (segmentDataManager != null) {
                                tableDataManager.releaseSegment(segmentDataManager);
                            }
                        }
                    } catch (Throwable th) {
                        if (segmentDataManager != null) {
                            tableDataManager.releaseSegment(segmentDataManager);
                        }
                        throw th;
                    }
                }
            }
        }
        return this._consumingSegments.size() - this._caughtUpSegments.size();
    }

    private TableDataManager getTableDataManager(String str) {
        return this._instanceDataManager.getTableDataManager(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(new LLCSegmentName(str).getTableName()));
    }
}
