package org.apache.pinot.core.data.manager.realtime;

import com.yammer.metrics.core.Meter;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.Message;
import org.apache.lucene.analysis.shingle.ShingleFilter;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
import org.apache.pinot.core.data.recordtransformer.RecordTransformer;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.core.indexsegment.mutable.MutableSegment;
import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
import org.apache.pinot.core.realtime.converter.RealtimeSegmentConverter;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.core.util.SchemaUtils;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.CompletionConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.PermanentConsumerException;
import org.apache.pinot.spi.stream.RowMetadata;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamDecoderProvider;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.TransientConsumerException;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.com.google.common.annotations.VisibleForTesting;
import shaded.com.google.common.base.Preconditions;
import shaded.com.google.common.util.concurrent.Uninterruptibles;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException
    */
/* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.class */
public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
    private static int MINIMUM_CONSUME_TIME_MINUTES = 10;
    private static final long TIME_THRESHOLD_FOR_LOG_MINUTES = 1;
    private static final long TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS = 1;
    private static final int MSG_COUNT_THRESHOLD_FOR_LOG = 100000;
    private static final int BUILD_TIME_LEASE_SECONDS = 30;
    private static final int MAX_CONSECUTIVE_ERROR_COUNT = 5;
    private final LLCRealtimeSegmentZKMetadata _segmentZKMetadata;
    private final TableConfig _tableConfig;
    private final RealtimeTableDataManager _realtimeTableDataManager;
    private final StreamMessageDecoder _messageDecoder;
    private final int _segmentMaxRowCount;
    private final String _resourceDataDir;
    private final IndexLoadingConfig _indexLoadingConfig;
    private final Schema _schema;
    private final Semaphore _partitionConsumerSemaphore;
    private final AtomicBoolean _acquiredConsumerSemaphore;
    private final String _metricKeyName;
    private final ServerMetrics _serverMetrics;
    private final MutableSegmentImpl _realtimeSegment;
    private final StreamPartitionMsgOffset _currentOffset;
    private volatile State _state;
    private final String _segmentNameStr;
    private final SegmentVersion _segmentVersion;
    private final SegmentBuildTimeLeaseExtender _leaseExtender;
    private SegmentBuildDescriptor _segmentBuildDescriptor;
    private StreamConsumerFactory _streamConsumerFactory;
    private volatile long _consumeEndTime;
    private StreamPartitionMsgOffset _finalOffset;
    private static final int MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS = 31;
    private Thread _consumerThread;
    private final String _streamTopic;
    private final int _streamPartitionId;
    final String _clientId;
    private final LLCSegmentName _llcSegmentName;
    private final RecordTransformer _recordTransformer;
    private final File _resourceTmpDir;
    private final String _tableNameWithType;
    private final List<String> _invertedIndexColumns;
    private final List<String> _textIndexColumns;
    private final List<String> _noDictionaryColumns;
    private final List<String> _varLengthDictionaryColumns;
    private final String _sortedColumn;
    private Logger segmentLogger;
    private final String _tableStreamName;
    private final PinotDataBufferMemoryManager _memoryManager;
    private final String _instanceId;
    private final ServerSegmentCompletionProtocolHandler _protocolHandler;
    private final long _consumeStartTime;
    private final StreamPartitionMsgOffset _startOffset;
    private final PartitionLevelStreamConfig _partitionLevelStreamConfig;
    private final Semaphore _segBuildSemaphore;
    private final boolean _isOffHeap;
    private final boolean _nullHandlingEnabled;
    private final SegmentCommitterFactory _segmentCommitterFactory;
    private volatile int _numRowsConsumed = 0;
    private volatile int _numRowsIndexed = 0;
    private volatile int _numRowsErrored = 0;
    private volatile int consecutiveErrorCount = 0;
    private long _startTimeMs = 0;
    private volatile boolean _shouldStop = false;
    private PartitionLevelConsumer _partitionLevelConsumer = null;
    private StreamMetadataProvider _streamMetadataProvider = null;
    private AtomicLong _lastUpdatedRowsIndexed = new AtomicLong(0);
    private long _lastLogTime = 0;
    private int _lastConsumedCount = 0;
    private String _stopReason = null;

    /* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager$PartitionConsumer.class */
    public class PartitionConsumer implements Runnable {
        public PartitionConsumer() {
        }

        /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
        /* JADX WARN: Removed duplicated region for block: B:63:0x031b  */
        @Override // java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 852
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.PartitionConsumer.run():void");
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager$SegmentBuildDescriptor.class */
    public class SegmentBuildDescriptor {
        final String _segmentTarFilePath;
        final Map<String, File> _metadataFileMap;
        final StreamPartitionMsgOffset _offset;
        final long _waitTimeMillis;
        final long _buildTimeMillis;
        final String _segmentDirPath;
        final long _segmentSizeBytes;

        public SegmentBuildDescriptor(String str, Map<String, File> map, StreamPartitionMsgOffset streamPartitionMsgOffset, String str2, long j, long j2, long j3) {
            this._segmentTarFilePath = str;
            this._metadataFileMap = map;
            this._offset = new StreamPartitionMsgOffset(streamPartitionMsgOffset.getOffset());
            this._buildTimeMillis = j;
            this._waitTimeMillis = j2;
            this._segmentDirPath = str2;
            this._segmentSizeBytes = j3;
        }

        public StreamPartitionMsgOffset getOffset() {
            return this._offset;
        }

        public long getBuildTimeMillis() {
            return this._buildTimeMillis;
        }

        public long getWaitTimeMillis() {
            return this._waitTimeMillis;
        }

        public String getSegmentTarFilePath() {
            return this._segmentTarFilePath;
        }

        public long getSegmentSizeBytes() {
            return this._segmentSizeBytes;
        }

        public void deleteSegmentFile() {
            if (this._segmentTarFilePath != null) {
                FileUtils.deleteQuietly(new File(this._segmentTarFilePath));
            }
        }

        public Map<String, File> getMetadataFiles() {
            return this._metadataFileMap;
        }
    }

    /* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager$State.class */
    public enum State {
        INITIAL_CONSUMING,
        CATCHING_UP,
        HOLDING,
        CONSUMING_TO_ONLINE,
        RETAINING,
        COMMITTING,
        DISCARDED,
        RETAINED,
        COMMITTED,
        ERROR;

        public boolean shouldConsume() {
            return equals(INITIAL_CONSUMING) || equals(CATCHING_UP) || equals(CONSUMING_TO_ONLINE);
        }

        public boolean isFinal() {
            return equals(ERROR) || equals(COMMITTED) || equals(RETAINED) || equals(DISCARDED);
        }
    }

    private boolean endCriteriaReached() {
        Preconditions.checkState(this._state.shouldConsume(), "Incorrect state %s", this._state);
        long now = now();
        switch (this._state) {
            case INITIAL_CONSUMING:
                if (now < this._consumeEndTime) {
                    if (this._numRowsIndexed < this._segmentMaxRowCount) {
                        return false;
                    }
                    this.segmentLogger.info("Stopping consumption due to row limit nRows={} numRowsIndexed={}, numRowsConsumed={}", Integer.valueOf(this._numRowsIndexed), Integer.valueOf(this._numRowsConsumed), Integer.valueOf(this._segmentMaxRowCount));
                    this._stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT;
                    return true;
                }
                if (this._realtimeSegment.getNumDocsIndexed() == 0) {
                    this.segmentLogger.info("No events came in, extending time by {} hours", (Object) 1L);
                    this._consumeEndTime += TimeUnit.HOURS.toMillis(1L);
                    return false;
                }
                this.segmentLogger.info("Stopping consumption due to time limit start={} now={} numRowsConsumed={} numRowsIndexed={}", Long.valueOf(this._startTimeMs), Long.valueOf(now), Integer.valueOf(this._numRowsConsumed), Integer.valueOf(this._numRowsIndexed));
                this._stopReason = SegmentCompletionProtocol.REASON_TIME_LIMIT;
                return true;
            case CATCHING_UP:
                this._stopReason = null;
                if (this._currentOffset.compareTo(this._finalOffset) == 0) {
                    this.segmentLogger.info("Caught up to offset={}, state={}", this._finalOffset, this._state.toString());
                    return true;
                }
                if (this._currentOffset.compareTo(this._finalOffset) <= 0) {
                    return false;
                }
                this.segmentLogger.error("Offset higher in state={}, current={}, final={}", this._state.toString(), this._currentOffset, this._finalOffset);
                throw new RuntimeException("Past max offset");
            case CONSUMING_TO_ONLINE:
                if (this._currentOffset.compareTo(this._finalOffset) == 0) {
                    this.segmentLogger.info("Caught up to offset={}, state={}", this._finalOffset, this._state.toString());
                    return true;
                }
                if (now >= this._consumeEndTime) {
                    this.segmentLogger.info("Past max time budget: offset={}, state={}", this._currentOffset, this._state.toString());
                    return true;
                }
                if (this._currentOffset.compareTo(this._finalOffset) <= 0) {
                    return false;
                }
                this.segmentLogger.error("Offset higher in state={}, current={}, final={}", this._state.toString(), this._currentOffset, this._finalOffset);
                throw new RuntimeException("Past max offset");
            default:
                this.segmentLogger.error("Illegal state {}" + this._state.toString());
                throw new RuntimeException("Illegal state to consume");
        }
    }

    private void handleTransientStreamErrors(Exception exc) throws Exception {
        this.consecutiveErrorCount++;
        if (this.consecutiveErrorCount > 5) {
            this.segmentLogger.warn("Stream transient exception when fetching messages, stopping consumption after {} attempts", Integer.valueOf(this.consecutiveErrorCount), exc);
            throw exc;
        }
        this.segmentLogger.warn("Stream transient exception when fetching messages, retrying (count={})", Integer.valueOf(this.consecutiveErrorCount), exc);
        Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.SECONDS);
        makeStreamConsumer("Too many transient errors");
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r3v10, types: [int] */
    protected boolean consumeLoop() throws Exception {
        this._numRowsErrored = 0;
        long fetchTimeoutMillis = 180000 / (100 + this._partitionLevelStreamConfig.getFetchTimeoutMillis());
        StreamPartitionMsgOffset streamPartitionMsgOffset = new StreamPartitionMsgOffset(this._currentOffset.getOffset());
        long j = 0;
        removeSegmentFile();
        StreamPartitionMsgOffset streamPartitionMsgOffset2 = new StreamPartitionMsgOffset(Long.MAX_VALUE);
        this.segmentLogger.info("Starting consumption loop start offset {}, finalOffset {}", this._currentOffset, this._finalOffset);
        while (!this._shouldStop && !endCriteriaReached()) {
            try {
                PartitionLevelConsumer partitionLevelConsumer = this._partitionLevelConsumer;
                long offset = this._currentOffset.getOffset();
                long offset2 = streamPartitionMsgOffset2.getOffset();
                ?? fetchTimeoutMillis2 = this._partitionLevelStreamConfig.getFetchTimeoutMillis();
                MessageBatch fetchMessages = partitionLevelConsumer.fetchMessages(offset, offset2, fetchTimeoutMillis2);
                this.consecutiveErrorCount = 0;
                processStreamEvents(fetchMessages, 100L);
                if (this._currentOffset.compareTo(streamPartitionMsgOffset) != 0) {
                    j = 0;
                    this._serverMetrics.setValueOfTableGauge(this._metricKeyName, ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED, this._currentOffset.getOffset());
                    this._serverMetrics.setValueOfTableGauge(this._metricKeyName, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED, this._currentOffset.getOffset());
                    this._serverMetrics.setValueOfTableGauge(this._metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1L);
                    streamPartitionMsgOffset.setOffset(this._currentOffset.getOffset());
                } else {
                    long j2 = j + 1;
                    j = fetchTimeoutMillis2;
                    if (j2 > fetchTimeoutMillis) {
                        this._serverMetrics.setValueOfTableGauge(this._metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1L);
                        j = 0;
                        makeStreamConsumer("Idle for too long");
                    }
                }
            } catch (TimeoutException e) {
                handleTransientStreamErrors(e);
            } catch (PermanentConsumerException e2) {
                this.segmentLogger.warn("Permanent exception from stream when fetching messages, stopping consumption", (Throwable) e2);
                throw e2;
            } catch (TransientConsumerException e3) {
                handleTransientStreamErrors(e3);
            } catch (Exception e4) {
                handleTransientStreamErrors(e4);
            }
        }
        if (this._numRowsErrored <= 0) {
            return true;
        }
        this._serverMetrics.addMeteredTableValue(this._metricKeyName, ServerMeter.ROWS_WITH_ERRORS, this._numRowsErrored);
        this._serverMetrics.addMeteredTableValue(this._tableStreamName, ServerMeter.ROWS_WITH_ERRORS, this._numRowsErrored);
        return true;
    }

    private void processStreamEvents(MessageBatch messageBatch, long j) {
        Meter meter = null;
        Meter meter2 = null;
        int i = 0;
        int i2 = 0;
        boolean z = true;
        GenericRow genericRow = new GenericRow();
        for (int i3 = 0; i3 < messageBatch.getMessageCount() && !this._shouldStop && !endCriteriaReached(); i3++) {
            if (!z) {
                this.segmentLogger.error("Buffer full with {} rows consumed (row limit {}, indexed {})", Integer.valueOf(this._numRowsConsumed), Integer.valueOf(this._numRowsIndexed), Integer.valueOf(this._segmentMaxRowCount));
                throw new RuntimeException("Realtime segment full");
            }
            genericRow.clear();
            RowMetadata metadataAtIndex = messageBatch.getMetadataAtIndex(i3);
            GenericRow decode = this._messageDecoder.decode(messageBatch.getMessageAtIndex(i3), messageBatch.getMessageOffsetAtIndex(i3), messageBatch.getMessageLengthAtIndex(i3), genericRow);
            if (decode != null) {
                try {
                    GenericRow transform = this._recordTransformer.transform(decode);
                    if (transform != null) {
                        meter = this._serverMetrics.addMeteredTableValue(this._metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1L, meter);
                        i++;
                    } else {
                        meter2 = this._serverMetrics.addMeteredTableValue(this._metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1L, meter2);
                    }
                    z = this._realtimeSegment.index(transform, metadataAtIndex);
                } catch (Exception e) {
                    this.segmentLogger.error("Caught exception while transforming the record: {}", decode, e);
                    this._numRowsErrored++;
                }
            } else {
                meter2 = this._serverMetrics.addMeteredTableValue(this._metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1L, meter2);
            }
            this._currentOffset.setOffset(messageBatch.getNextStreamMessageOffsetAtIndex(i3));
            this._numRowsIndexed = this._realtimeSegment.getNumDocsIndexed();
            this._numRowsConsumed++;
            i2++;
        }
        updateCurrentDocumentCountMetrics();
        if (i2 != 0) {
            this.segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}", Integer.valueOf(i), Integer.valueOf(i2), this._currentOffset);
        } else {
            Uninterruptibles.sleepUninterruptibly(j, TimeUnit.MILLISECONDS);
        }
    }

    public CommonConstants.Segment.Realtime.CompletionMode getSegmentCompletionMode() {
        CompletionConfig completionConfig = this._tableConfig.getValidationConfig().getCompletionConfig();
        return (completionConfig == null || !CommonConstants.Segment.Realtime.CompletionMode.DOWNLOAD.toString().equalsIgnoreCase(completionConfig.getCompletionMode())) ? CommonConstants.Segment.Realtime.CompletionMode.DEFAULT : CommonConstants.Segment.Realtime.CompletionMode.DOWNLOAD;
    }

    private File makeSegmentDirPath() {
        return new File(this._resourceDataDir, this._segmentZKMetadata.getSegmentName());
    }

    protected void buildSegmentForCommit(long j) {
        try {
            if (this._segmentBuildDescriptor != null && this._segmentBuildDescriptor.getOffset().compareTo(this._currentOffset) == 0 && new File(this._segmentBuildDescriptor.getSegmentTarFilePath()).exists()) {
                return;
            }
            removeSegmentFile();
            if (j <= 0) {
                j = this._segBuildSemaphore == null ? SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds() * 1000 : 30000L;
            }
            this._leaseExtender.addSegment(this._segmentNameStr, j, this._currentOffset);
            this._segmentBuildDescriptor = buildSegmentInternal(true);
            this._leaseExtender.removeSegment(this._segmentNameStr);
        } finally {
            this._leaseExtender.removeSegment(this._segmentNameStr);
        }
    }

    @VisibleForTesting
    protected StreamPartitionMsgOffset getCurrentOffset() {
        return this._currentOffset;
    }

    @VisibleForTesting
    protected SegmentBuildDescriptor getSegmentBuildDescriptor() {
        return this._segmentBuildDescriptor;
    }

    @VisibleForTesting
    protected Semaphore getPartitionConsumerSemaphore() {
        return this._partitionConsumerSemaphore;
    }

    @VisibleForTesting
    protected AtomicBoolean getAcquiredConsumerSemaphore() {
        return this._acquiredConsumerSemaphore;
    }

    protected SegmentBuildDescriptor buildSegmentInternal(boolean z) {
        closeKafkaConsumers();
        try {
            try {
                long now = now();
                if (this._segBuildSemaphore != null) {
                    this.segmentLogger.info("Waiting to acquire semaphore for building segment");
                    this._segBuildSemaphore.acquire();
                }
                this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, 1L);
                long now2 = now();
                File file = new File(this._resourceTmpDir, "tmp-" + this._segmentNameStr + "-" + now());
                RealtimeSegmentConverter realtimeSegmentConverter = new RealtimeSegmentConverter(this._realtimeSegment, file.getAbsolutePath(), this._schema, this._tableNameWithType, this._tableConfig, this._segmentZKMetadata.getSegmentName(), this._sortedColumn, this._invertedIndexColumns, this._textIndexColumns, this._noDictionaryColumns, this._varLengthDictionaryColumns, this._nullHandlingEnabled);
                this.segmentLogger.info("Trying to build segment");
                try {
                    realtimeSegmentConverter.build(this._segmentVersion, this._serverMetrics);
                    long now3 = now() - now2;
                    long j = now2 - now;
                    this.segmentLogger.info("Successfully built segment in {} ms, after lockWaitTime {} ms", Long.valueOf(now3), Long.valueOf(j));
                    File makeSegmentDirPath = makeSegmentDirPath();
                    FileUtils.deleteQuietly(makeSegmentDirPath);
                    try {
                        FileUtils.moveDirectory(file.listFiles()[0], makeSegmentDirPath);
                        if (z) {
                            TarGzCompressionUtils.createTarGzOfDirectory(makeSegmentDirPath.getAbsolutePath());
                        }
                        long sizeOfDirectory = FileUtils.sizeOfDirectory(makeSegmentDirPath);
                        FileUtils.deleteQuietly(file);
                        this._serverMetrics.setValueOfTableGauge(this._metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_DURATION_SECONDS, TimeUnit.MILLISECONDS.toSeconds(now3));
                        this._serverMetrics.setValueOfTableGauge(this._metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS, TimeUnit.MILLISECONDS.toSeconds(j));
                        if (!z) {
                            SegmentBuildDescriptor segmentBuildDescriptor = new SegmentBuildDescriptor(null, null, this._currentOffset, makeSegmentDirPath.getAbsolutePath(), now3, j, sizeOfDirectory);
                            if (this._segBuildSemaphore != null) {
                                this._segBuildSemaphore.release();
                            }
                            this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, -1L);
                            return segmentBuildDescriptor;
                        }
                        File[] listFiles = makeSegmentDirPath.listFiles();
                        if (listFiles == null || listFiles.length == 0) {
                            this.segmentLogger.error("The index dir is empty: {}", makeSegmentDirPath);
                            if (this._segBuildSemaphore != null) {
                                this._segBuildSemaphore.release();
                            }
                            this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, -1L);
                            return null;
                        }
                        File file2 = new File(listFiles[0], V1Constants.MetadataKeys.METADATA_FILE_NAME);
                        if (!file2.exists()) {
                            this.segmentLogger.error("File does not exist in {} for {}.", makeSegmentDirPath, V1Constants.MetadataKeys.METADATA_FILE_NAME);
                            if (this._segBuildSemaphore != null) {
                                this._segBuildSemaphore.release();
                            }
                            this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, -1L);
                            return null;
                        }
                        File file3 = new File(listFiles[0], V1Constants.SEGMENT_CREATION_META);
                        if (!file3.exists()) {
                            this.segmentLogger.error("File does not exist in {} for {}.", makeSegmentDirPath, V1Constants.SEGMENT_CREATION_META);
                            if (this._segBuildSemaphore != null) {
                                this._segBuildSemaphore.release();
                            }
                            this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, -1L);
                            return null;
                        }
                        HashMap hashMap = new HashMap();
                        hashMap.put(V1Constants.MetadataKeys.METADATA_FILE_NAME, file2);
                        hashMap.put(V1Constants.SEGMENT_CREATION_META, file3);
                        SegmentBuildDescriptor segmentBuildDescriptor2 = new SegmentBuildDescriptor(makeSegmentDirPath.getAbsolutePath() + ".tar.gz", hashMap, this._currentOffset, null, now3, j, sizeOfDirectory);
                        if (this._segBuildSemaphore != null) {
                            this._segBuildSemaphore.release();
                        }
                        this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, -1L);
                        return segmentBuildDescriptor2;
                    } catch (IOException e) {
                        this.segmentLogger.error("Exception during move/tar segment", (Throwable) e);
                        FileUtils.deleteQuietly(file);
                        if (this._segBuildSemaphore != null) {
                            this._segBuildSemaphore.release();
                        }
                        this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, -1L);
                        return null;
                    }
                } catch (Exception e2) {
                    this.segmentLogger.error("Could not build segment", (Throwable) e2);
                    FileUtils.deleteQuietly(file);
                    if (this._segBuildSemaphore != null) {
                        this._segBuildSemaphore.release();
                    }
                    this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, -1L);
                    return null;
                }
            } catch (Throwable th) {
                if (this._segBuildSemaphore != null) {
                    this._segBuildSemaphore.release();
                }
                this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, -1L);
                throw th;
            }
        } catch (InterruptedException e3) {
            this.segmentLogger.error("Interrupted while waiting for semaphore");
            if (this._segBuildSemaphore != null) {
                this._segBuildSemaphore.release();
            }
            this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, -1L);
            return null;
        }
    }

    protected boolean commitSegment(String str, boolean z) {
        String segmentTarFilePath = this._segmentBuildDescriptor.getSegmentTarFilePath();
        if (!new File(segmentTarFilePath).exists()) {
            throw new RuntimeException("Segment file does not exist:" + segmentTarFilePath);
        }
        if (!commit(str, z).getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) {
            return false;
        }
        this._realtimeTableDataManager.replaceLLSegment(this._segmentNameStr, this._indexLoadingConfig);
        removeSegmentFile();
        return true;
    }

    protected SegmentCompletionProtocol.Response commit(String str, boolean z) {
        SegmentCommitter createSplitSegmentCommitter;
        SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
        params.withSegmentName(this._segmentNameStr).withStreamPartitionMsgOffset(this._currentOffset).withNumRows(this._numRowsConsumed).withInstanceId(this._instanceId).withBuildTimeMillis(this._segmentBuildDescriptor.getBuildTimeMillis()).withSegmentSizeBytes(this._segmentBuildDescriptor.getSegmentSizeBytes()).withWaitTimeMillis(this._segmentBuildDescriptor.getWaitTimeMillis());
        if (this._isOffHeap) {
            params.withMemoryUsedBytes(this._memoryManager.getTotalAllocatedBytes());
        }
        if (z) {
            try {
                createSplitSegmentCommitter = this._segmentCommitterFactory.createSplitSegmentCommitter(params, new Server2ControllerSegmentUploader(this.segmentLogger, this._protocolHandler.getFileUploadDownloadClient(), this._protocolHandler.getSegmentCommitUploadURL(params, str), this._segmentNameStr, ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), this._serverMetrics));
            } catch (URISyntaxException e) {
                this.segmentLogger.error("Segment commit upload url error: ", (Throwable) e);
                return SegmentCompletionProtocol.RESP_NOT_SENT;
            }
        } else {
            createSplitSegmentCommitter = this._segmentCommitterFactory.createDefaultSegmentCommitter(params);
        }
        return createSplitSegmentCommitter.commit(this._segmentBuildDescriptor);
    }

    protected boolean buildSegmentAndReplace() {
        if (buildSegmentInternal(false) == null) {
            return false;
        }
        this._realtimeTableDataManager.replaceLLSegment(this._segmentNameStr, this._indexLoadingConfig);
        return true;
    }

    private void closeKafkaConsumers() {
        closePartitionLevelConsumer();
        closeStreamMetadataProvider();
        if (this._acquiredConsumerSemaphore.compareAndSet(true, false)) {
            this._partitionConsumerSemaphore.release();
        }
    }

    private void closePartitionLevelConsumer() {
        try {
            this._partitionLevelConsumer.close();
        } catch (Exception e) {
            this.segmentLogger.warn("Could not close stream consumer", (Throwable) e);
        }
    }

    private void closeStreamMetadataProvider() {
        try {
            this._streamMetadataProvider.close();
        } catch (Exception e) {
            this.segmentLogger.warn("Could not close stream metadata provider", (Throwable) e);
        }
    }

    protected void hold() {
        try {
            Thread.sleep(SegmentCompletionProtocol.MAX_HOLD_TIME_MS);
        } catch (InterruptedException e) {
            this.segmentLogger.warn("Interrupted while holding");
        }
    }

    protected void postStopConsumedMsg(String str) {
        do {
            SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
            params.withStreamPartitionMsgOffset(this._currentOffset).withReason(str).withSegmentName(this._segmentNameStr).withInstanceId(this._instanceId);
            SegmentCompletionProtocol.Response segmentStoppedConsuming = this._protocolHandler.segmentStoppedConsuming(params);
            if (segmentStoppedConsuming.getStatus() == SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED) {
                this.segmentLogger.info("Got response {}", segmentStoppedConsuming.toJsonString());
                return;
            } else {
                Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.SECONDS);
                this.segmentLogger.info("Retrying after response {}", segmentStoppedConsuming.toJsonString());
            }
        } while (!this._shouldStop);
    }

    protected SegmentCompletionProtocol.Response postSegmentConsumedMsg() {
        SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
        params.withStreamPartitionMsgOffset(this._currentOffset).withSegmentName(this._segmentNameStr).withReason(this._stopReason).withNumRows(this._numRowsConsumed).withInstanceId(this._instanceId);
        if (this._isOffHeap) {
            params.withMemoryUsedBytes(this._memoryManager.getTotalAllocatedBytes());
        }
        return this._protocolHandler.segmentConsumed(params);
    }

    public void removeSegmentFile() {
        if (this._segmentBuildDescriptor != null) {
            this._segmentBuildDescriptor.deleteSegmentFile();
            this._segmentBuildDescriptor = null;
        }
    }

    public void goOnlineFromConsuming(RealtimeSegmentZKMetadata realtimeSegmentZKMetadata) throws InterruptedException {
        this._serverMetrics.setValueOfTableGauge(this._metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0L);
        try {
            try {
                LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata = (LLCRealtimeSegmentZKMetadata) realtimeSegmentZKMetadata;
                removeSegmentFile();
                this._leaseExtender.removeSegment(this._segmentNameStr);
                StreamPartitionMsgOffset streamPartitionMsgOffset = new StreamPartitionMsgOffset(lLCRealtimeSegmentZKMetadata.getEndOffset());
                this.segmentLogger.info("State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})", this._state.toString(), this._startOffset, streamPartitionMsgOffset);
                stop();
                this.segmentLogger.info("Consumer thread stopped in state {}", this._state.toString());
                switch (this._state) {
                    case INITIAL_CONSUMING:
                    case CATCHING_UP:
                    case HOLDING:
                        CommonConstants.Segment.Realtime.CompletionMode segmentCompletionMode = getSegmentCompletionMode();
                        switch (segmentCompletionMode) {
                            case DOWNLOAD:
                                this.segmentLogger.info("State {}. CompletionMode {}. Downloading to replace", this._state.toString(), segmentCompletionMode);
                                downloadSegmentAndReplace(lLCRealtimeSegmentZKMetadata);
                                break;
                            case DEFAULT:
                                if (this._currentOffset.compareTo(streamPartitionMsgOffset) <= 0) {
                                    if (this._currentOffset.compareTo(streamPartitionMsgOffset) != 0) {
                                        this.segmentLogger.info("Attempting to catch up from offset {} to {} ", this._currentOffset, streamPartitionMsgOffset);
                                        if (!catchupToFinalOffset(streamPartitionMsgOffset, TimeUnit.MILLISECONDS.convert(31L, TimeUnit.SECONDS))) {
                                            this.segmentLogger.info("Could not catch up to offset (current = {}). Downloading to replace", this._currentOffset);
                                            downloadSegmentAndReplace(lLCRealtimeSegmentZKMetadata);
                                            break;
                                        } else {
                                            this.segmentLogger.info("Caught up to offset {}", this._currentOffset);
                                            buildSegmentAndReplace();
                                            break;
                                        }
                                    } else {
                                        this.segmentLogger.info("Current offset {} matches offset in zk {}. Replacing segment", this._currentOffset, streamPartitionMsgOffset);
                                        buildSegmentAndReplace();
                                        break;
                                    }
                                } else {
                                    this.segmentLogger.warn("Current offset {} ahead of the offset in zk {}. Downloading to replace", this._currentOffset, streamPartitionMsgOffset);
                                    downloadSegmentAndReplace(lLCRealtimeSegmentZKMetadata);
                                    break;
                                }
                        }
                        break;
                    case CONSUMING_TO_ONLINE:
                    default:
                        this.segmentLogger.info("Downloading to replace segment while in state {}", this._state.toString());
                        downloadSegmentAndReplace(lLCRealtimeSegmentZKMetadata);
                        break;
                    case COMMITTED:
                    case RETAINED:
                        this.segmentLogger.info("State {}. Nothing to do", this._state.toString());
                        break;
                    case DISCARDED:
                    case ERROR:
                        this.segmentLogger.info("State {}. Downloading to replace", this._state.toString());
                        downloadSegmentAndReplace(lLCRealtimeSegmentZKMetadata);
                        break;
                }
                this._serverMetrics.setValueOfTableGauge(this._metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0L);
            } catch (Exception e) {
                Utils.rethrowException(e);
                this._serverMetrics.setValueOfTableGauge(this._metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0L);
            }
        } catch (Throwable th) {
            this._serverMetrics.setValueOfTableGauge(this._metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0L);
            throw th;
        }
    }

    protected void downloadSegmentAndReplace(LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata) {
        closeKafkaConsumers();
        this._realtimeTableDataManager.downloadAndReplaceSegment(this._segmentNameStr, lLCRealtimeSegmentZKMetadata, this._indexLoadingConfig);
    }

    protected long now() {
        return System.currentTimeMillis();
    }

    private boolean catchupToFinalOffset(StreamPartitionMsgOffset streamPartitionMsgOffset, long j) {
        this._finalOffset = streamPartitionMsgOffset;
        this._consumeEndTime = now() + j;
        this._state = State.CONSUMING_TO_ONLINE;
        this._shouldStop = false;
        try {
            try {
                consumeLoop();
                this._serverMetrics.setValueOfTableGauge(this._metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0L);
                if (this._currentOffset.compareTo(streamPartitionMsgOffset) == 0) {
                    return true;
                }
                this.segmentLogger.error("Could not consume up to {} (current offset {})", streamPartitionMsgOffset, this._currentOffset);
                return false;
            } catch (Exception e) {
                this.segmentLogger.warn("Exception when catching up to final offset", (Throwable) e);
                this._serverMetrics.setValueOfTableGauge(this._metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0L);
                return false;
            }
        } catch (Throwable th) {
            this._serverMetrics.setValueOfTableGauge(this._metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0L);
            throw th;
        }
    }

    @Override // org.apache.pinot.core.data.manager.SegmentDataManager
    public void destroy() {
        try {
            stop();
        } catch (InterruptedException e) {
            this.segmentLogger.error("Could not stop consumer thread");
        }
        this._realtimeSegment.destroy();
        closeKafkaConsumers();
    }

    protected void start() {
        this._consumerThread = new Thread(new PartitionConsumer(), this._segmentNameStr);
        this.segmentLogger.info("Created new consumer thread {} for {}", this._consumerThread, toString());
        this._consumerThread.start();
    }

    public void stop() throws InterruptedException {
        this._shouldStop = true;
        if (Thread.currentThread() != this._consumerThread) {
            Uninterruptibles.joinUninterruptibly(this._consumerThread, 10L, TimeUnit.MINUTES);
            if (this._consumerThread.isAlive()) {
                this.segmentLogger.warn("Failed to stop consumer thread within 10 minutes");
            }
        }
    }

    public LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, TableConfig tableConfig, RealtimeTableDataManager realtimeTableDataManager, String str, IndexLoadingConfig indexLoadingConfig, Schema schema, LLCSegmentName lLCSegmentName, Semaphore semaphore, ServerMetrics serverMetrics) {
        this._consumeEndTime = 0L;
        this._segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
        this._segmentZKMetadata = (LLCRealtimeSegmentZKMetadata) realtimeSegmentZKMetadata;
        this._tableConfig = tableConfig;
        this._tableNameWithType = this._tableConfig.getTableName();
        this._realtimeTableDataManager = realtimeTableDataManager;
        this._resourceDataDir = str;
        this._indexLoadingConfig = indexLoadingConfig;
        this._schema = schema;
        this._serverMetrics = serverMetrics;
        this._segmentVersion = indexLoadingConfig.getSegmentVersion();
        this._instanceId = this._realtimeTableDataManager.getServerInstance();
        this._leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(this._instanceId);
        this._protocolHandler = new ServerSegmentCompletionProtocolHandler(this._serverMetrics, this._tableNameWithType);
        String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
        IndexingConfig indexingConfig = this._tableConfig.getIndexingConfig();
        this._partitionLevelStreamConfig = new PartitionLevelStreamConfig(this._tableNameWithType, indexingConfig.getStreamConfigs());
        this._streamConsumerFactory = StreamConsumerFactoryProvider.create(this._partitionLevelStreamConfig);
        this._streamTopic = this._partitionLevelStreamConfig.getTopicName();
        this._segmentNameStr = this._segmentZKMetadata.getSegmentName();
        this._llcSegmentName = lLCSegmentName;
        this._streamPartitionId = this._llcSegmentName.getPartitionId();
        this._partitionConsumerSemaphore = semaphore;
        this._acquiredConsumerSemaphore = new AtomicBoolean(false);
        this._metricKeyName = this._tableNameWithType + "-" + this._streamTopic + "-" + this._streamPartitionId;
        this.segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + ShingleFilter.DEFAULT_FILLER_TOKEN + this._segmentNameStr);
        this._tableStreamName = this._tableNameWithType + ShingleFilter.DEFAULT_FILLER_TOKEN + this._streamTopic;
        this._memoryManager = getMemoryManager(realtimeTableDataManager.getConsumerDir(), this._segmentNameStr, indexLoadingConfig.isRealtimeOffheapAllocation(), indexLoadingConfig.isDirectRealtimeOffheapAllocation(), serverMetrics);
        List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
        if (sortedColumns.isEmpty()) {
            this.segmentLogger.info("RealtimeDataResourceZKMetadata contains no information about sorted column for segment {}", this._llcSegmentName);
            this._sortedColumn = null;
        } else {
            String str2 = sortedColumns.get(0);
            if (this._schema.hasColumn(str2)) {
                this.segmentLogger.info("Setting sorted column name: {} from RealtimeDataResourceZKMetadata for segment {}", str2, this._llcSegmentName);
                this._sortedColumn = str2;
            } else {
                this.segmentLogger.warn("Sorted column name: {} from RealtimeDataResourceZKMetadata is not existed in schema for segment {}.", str2, this._llcSegmentName);
                this._sortedColumn = null;
            }
        }
        Set<String> invertedIndexColumns = indexLoadingConfig.getInvertedIndexColumns();
        if (this._sortedColumn != null) {
            invertedIndexColumns.add(this._sortedColumn);
        }
        this._invertedIndexColumns = new ArrayList(invertedIndexColumns);
        this._noDictionaryColumns = new ArrayList(indexLoadingConfig.getNoDictionaryColumns());
        this._varLengthDictionaryColumns = new ArrayList(indexLoadingConfig.getVarLengthDictionaryColumns());
        this._segmentMaxRowCount = 0 < realtimeSegmentZKMetadata.getSizeThresholdToFlushSegment() ? realtimeSegmentZKMetadata.getSizeThresholdToFlushSegment() : this._partitionLevelStreamConfig.getFlushThresholdRows();
        this._isOffHeap = indexLoadingConfig.isRealtimeOffheapAllocation();
        this._nullHandlingEnabled = indexingConfig.isNullHandlingEnabled();
        Set<String> textIndexColumns = indexLoadingConfig.getTextIndexColumns();
        this._textIndexColumns = new ArrayList(textIndexColumns);
        RealtimeSegmentConfig.Builder consumerDir = new RealtimeSegmentConfig.Builder().setSegmentName(this._segmentNameStr).setStreamName(this._streamTopic).setSchema(this._schema).setTimeColumnName(timeColumnName).setCapacity(this._segmentMaxRowCount).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount()).setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns()).setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns()).setInvertedIndexColumns(invertedIndexColumns).setTextIndexColumns(textIndexColumns).setRealtimeSegmentZKMetadata(realtimeSegmentZKMetadata).setOffHeap(this._isOffHeap).setMemoryManager(this._memoryManager).setStatsHistory(realtimeTableDataManager.getStatsHistory()).setAggregateMetrics(indexingConfig.isAggregateMetrics()).setNullHandlingEnabled(this._nullHandlingEnabled).setConsumerDir(realtimeTableDataManager.getConsumerDir());
        this._messageDecoder = StreamDecoderProvider.create(this._partitionLevelStreamConfig, SchemaUtils.extractSourceFields(this._schema));
        this._clientId = this._streamTopic + "-" + this._streamPartitionId;
        this._recordTransformer = CompositeTransformer.getDefaultTransformer(schema);
        try {
            this._partitionConsumerSemaphore.acquire();
            this._acquiredConsumerSemaphore.set(true);
            makeStreamConsumer("Starting");
            makeStreamMetadataProvider("Starting");
            SegmentPartitionConfig segmentPartitionConfig = indexingConfig.getSegmentPartitionConfig();
            if (segmentPartitionConfig != null) {
                Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
                if (columnPartitionMap.size() == 1) {
                    Map.Entry<String, ColumnPartitionConfig> next = columnPartitionMap.entrySet().iterator().next();
                    String key = next.getKey();
                    ColumnPartitionConfig value = next.getValue();
                    String functionName = value.getFunctionName();
                    int numPartitions = value.getNumPartitions();
                    try {
                        int fetchPartitionCount = this._streamMetadataProvider.fetchPartitionCount(Message.RELAY_MESSAGE_DEFAULT_EXPIRY);
                        if (fetchPartitionCount != numPartitions) {
                            this.segmentLogger.warn("Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions", Integer.valueOf(fetchPartitionCount), Integer.valueOf(numPartitions));
                            numPartitions = fetchPartitionCount;
                        }
                    } catch (Exception e) {
                        this.segmentLogger.warn("Failed to get number of stream partitions in 5s, using number of partitions in the partition config: {}", Integer.valueOf(numPartitions), e);
                        makeStreamMetadataProvider("Timeout getting number of stream partitions");
                    }
                    consumerDir.setPartitionColumn(key);
                    consumerDir.setPartitionFunction(PartitionFunctionFactory.getPartitionFunction(functionName, numPartitions));
                    consumerDir.setPartitionId(this._streamPartitionId);
                } else {
                    this.segmentLogger.warn("Cannot partition on multiple columns: {}", columnPartitionMap.keySet());
                }
            }
            this._realtimeSegment = new MutableSegmentImpl(consumerDir.build());
            this._startOffset = new StreamPartitionMsgOffset(this._segmentZKMetadata.getStartOffset());
            this._currentOffset = new StreamPartitionMsgOffset(this._startOffset.getOffset());
            this._resourceTmpDir = new File(str, "_tmp");
            if (!this._resourceTmpDir.exists()) {
                this._resourceTmpDir.mkdirs();
            }
            this._state = State.INITIAL_CONSUMING;
            long now = now();
            this._consumeStartTime = now;
            long flushThresholdTimeMillis = this._partitionLevelStreamConfig.getFlushThresholdTimeMillis();
            this._consumeEndTime = realtimeSegmentZKMetadata.getCreationTime() + flushThresholdTimeMillis;
            long min = Math.min(flushThresholdTimeMillis, TimeUnit.MILLISECONDS.convert(MINIMUM_CONSUME_TIME_MINUTES, TimeUnit.MINUTES));
            if (this._consumeEndTime - now < min) {
                this._consumeEndTime = now + min;
            }
            this._segmentCommitterFactory = new SegmentCommitterFactory(this.segmentLogger, this._protocolHandler);
            this.segmentLogger.info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", this._llcSegmentName, Integer.valueOf(this._segmentMaxRowCount), new DateTime(this._consumeEndTime, DateTimeZone.UTC).toString());
            start();
        } catch (InterruptedException e2) {
            this.segmentLogger.error("InterruptedException when acquiring the partitionConsumerSemaphore");
            throw new RuntimeException("InterruptedException when acquiring the partitionConsumerSemaphore for segment: " + this._segmentNameStr);
        }
    }

    private void makeStreamConsumer(String str) {
        if (this._partitionLevelConsumer != null) {
            closePartitionLevelConsumer();
        }
        this.segmentLogger.info("Creating new stream consumer, reason: {}", str);
        this._partitionLevelConsumer = this._streamConsumerFactory.createPartitionLevelConsumer(this._clientId, this._streamPartitionId);
    }

    private void makeStreamMetadataProvider(String str) {
        if (this._streamMetadataProvider != null) {
            closeStreamMetadataProvider();
        }
        this.segmentLogger.info("Creating new stream metadata provider, reason: {}", str);
        this._streamMetadataProvider = this._streamConsumerFactory.createPartitionMetadataProvider(this._clientId, this._streamPartitionId);
    }

    private void updateCurrentDocumentCountMetrics() {
        this._serverMetrics.addValueToTableGauge(this._tableNameWithType, ServerGauge.DOCUMENT_COUNT, this._numRowsIndexed - this._lastUpdatedRowsIndexed.get());
        this._lastUpdatedRowsIndexed.set(this._numRowsIndexed);
        long now = now();
        int i = this._numRowsConsumed - this._lastConsumedCount;
        long j = this._lastConsumedCount == 0 ? this._consumeStartTime : this._lastLogTime;
        if (now - j > TimeUnit.MINUTES.toMillis(1L) || i >= 100000) {
            this.segmentLogger.info("Consumed {} events from (rate:{}/s), currentOffset={}, numRowsConsumedSoFar={}, numRowsIndexedSoFar={}", Integer.valueOf(i), Float.valueOf((i * 1000.0f) / ((float) (now - j))), this._currentOffset, Integer.valueOf(this._numRowsConsumed), Integer.valueOf(this._numRowsIndexed));
            this._lastConsumedCount = this._numRowsConsumed;
            this._lastLogTime = now;
        }
    }

    @Override // org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager, org.apache.pinot.core.data.manager.SegmentDataManager
    public MutableSegment getSegment() {
        return this._realtimeSegment;
    }

    @Override // org.apache.pinot.core.data.manager.SegmentDataManager
    public String getSegmentName() {
        return this._segmentNameStr;
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.access$002(org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$002(org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0._startTimeMs = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.access$002(org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager, long):long");
    }

    static {
    }
}
