package org.apache.druid.segment.realtime.appenderator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.derby.iapi.store.raw.RowLock;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.metrics.FeedDefiningMonitor;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;

/* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/BatchAppenderator.class */
public class BatchAppenderator implements Appenderator {
    public static final int ROUGH_OVERHEAD_PER_SINK = 5000;
    public static final int ROUGH_OVERHEAD_PER_HYDRANT = 1000;
    private static final EmittingLogger log = new EmittingLogger(BatchAppenderator.class);
    private static final String IDENTIFIER_FILE_NAME = "identifier.json";
    private final String myId;
    private final DataSchema schema;
    private final AppenderatorConfig tuningConfig;
    private final FireDepartmentMetrics metrics;
    private final DataSegmentPusher dataSegmentPusher;
    private final ObjectMapper objectMapper;
    private final IndexIO indexIO;
    private final IndexMerger indexMerger;
    private final long maxBytesTuningConfig;
    private final boolean skipBytesInMemoryOverheadCheck;
    private final int maxPendingPersists;
    private static final int PERSIST_WARN_DELAY = 1000;
    private volatile Throwable persistError;
    private final RowIngestionMeters rowIngestionMeters;
    private final ParseExceptionHandler parseExceptionHandler;
    private volatile ListeningExecutorService persistExecutor = null;
    private volatile ListeningExecutorService pushExecutor = null;
    private final Map<SegmentIdWithShardSpec, Sink> sinks = new HashMap();
    private final ConcurrentHashMap<SegmentIdWithShardSpec, SinkMetadata> sinksMetadata = new ConcurrentHashMap<>();
    private int rowsCurrentlyInMemory = 0;
    private int totalRows = 0;
    private long bytesCurrentlyInMemory = 0;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private volatile FileLock basePersistDirLock = null;
    private volatile FileChannel basePersistDirLockChannel = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/BatchAppenderator$SinkMetadata.class */
    public static class SinkMetadata {
        private int numRowsInSegment;
        private int numHydrants;
        File persistedFileDir;

        public SinkMetadata() {
            this(0, 0);
        }

        public SinkMetadata(int i, int i2) {
            this.numRowsInSegment = i;
            this.numHydrants = i2;
        }

        public void addRows(int i) {
            this.numRowsInSegment += i;
        }

        public void addHydrants(int i) {
            this.numHydrants += i;
        }

        public int getNumRowsInSegment() {
            return this.numRowsInSegment;
        }

        public int getNumHydrants() {
            return this.numHydrants;
        }

        public void setPersistedFileDir(File file) {
            this.persistedFileDir = file;
        }

        public File getPersistedFileDir() {
            return this.persistedFileDir;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BatchAppenderator(String str, DataSchema dataSchema, AppenderatorConfig appenderatorConfig, FireDepartmentMetrics fireDepartmentMetrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, ParseExceptionHandler parseExceptionHandler) {
        this.myId = str;
        this.schema = (DataSchema) Preconditions.checkNotNull(dataSchema, "schema");
        this.tuningConfig = (AppenderatorConfig) Preconditions.checkNotNull(appenderatorConfig, "tuningConfig");
        this.metrics = (FireDepartmentMetrics) Preconditions.checkNotNull(fireDepartmentMetrics, FeedDefiningMonitor.DEFAULT_METRICS_FEED);
        this.dataSegmentPusher = (DataSegmentPusher) Preconditions.checkNotNull(dataSegmentPusher, "dataSegmentPusher");
        this.objectMapper = (ObjectMapper) Preconditions.checkNotNull(objectMapper, "objectMapper");
        this.indexIO = (IndexIO) Preconditions.checkNotNull(indexIO, "indexIO");
        this.indexMerger = (IndexMerger) Preconditions.checkNotNull(indexMerger, "indexMerger");
        this.rowIngestionMeters = (RowIngestionMeters) Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters");
        this.parseExceptionHandler = (ParseExceptionHandler) Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler");
        this.maxBytesTuningConfig = appenderatorConfig.getMaxBytesInMemoryOrDefault();
        this.skipBytesInMemoryOverheadCheck = appenderatorConfig.isSkipBytesInMemoryOverheadCheck();
        this.maxPendingPersists = appenderatorConfig.getMaxPendingPersists();
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public String getId() {
        return this.myId;
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public String getDataSource() {
        return this.schema.getDataSource();
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public Object startJob() {
        this.tuningConfig.getBasePersistDirectory().mkdirs();
        lockBasePersistDirectory();
        initializeExecutors();
        return null;
    }

    private void throwPersistErrorIfExists() {
        if (this.persistError != null) {
            throw new RE(this.persistError, "Error while persisting", new Object[0]);
        }
    }

    private void initializeExecutors() {
        log.debug("There will be up to[%d] pending persists", Integer.valueOf(this.maxPendingPersists));
        if (this.persistExecutor == null) {
            this.persistExecutor = MoreExecutors.listeningDecorator(Execs.newBlockingSingleThreaded("[" + StringUtils.encodeForFormat(this.myId) + "]-batch-appenderator-persist", this.maxPendingPersists));
        }
        if (this.pushExecutor == null) {
            this.pushExecutor = MoreExecutors.listeningDecorator(Execs.newBlockingSingleThreaded("[" + StringUtils.encodeForFormat(this.myId) + "]-batch-appenderator-push", 1));
        }
    }

    private void shutdownExecutors() {
        if (this.persistExecutor != null) {
            this.persistExecutor.shutdownNow();
        }
        if (this.pushExecutor != null) {
            this.pushExecutor.shutdownNow();
        }
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public Appenderator.AppenderatorAddResult add(SegmentIdWithShardSpec segmentIdWithShardSpec, InputRow inputRow, @Nullable Supplier<Committer> supplier, boolean z) throws IndexSizeExceededException, SegmentNotWritableException {
        throwPersistErrorIfExists();
        Preconditions.checkArgument(supplier == null, "Batch appenderator does not need a committer!");
        Preconditions.checkArgument(z, "Batch appenderator should always allow incremental persists!");
        if (!segmentIdWithShardSpec.getDataSource().equals(this.schema.getDataSource())) {
            throw new IAE("Expected dataSource[%s] but was asked to insert row for dataSource[%s]?!", this.schema.getDataSource(), segmentIdWithShardSpec.getDataSource());
        }
        Sink orCreateSink = getOrCreateSink(segmentIdWithShardSpec);
        this.metrics.reportMessageMaxTimestamp(inputRow.getTimestampFromEpoch());
        int numRowsInMemory = orCreateSink.getNumRowsInMemory();
        long bytesInMemory = orCreateSink.getBytesInMemory();
        try {
            IncrementalIndexAddResult add = orCreateSink.add(inputRow, false);
            int rowCount = add.getRowCount();
            long bytesInMemory2 = add.getBytesInMemory();
            if (rowCount < 0) {
                throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", segmentIdWithShardSpec);
            }
            if (add.isRowAdded()) {
                this.rowIngestionMeters.incrementProcessed();
            } else if (add.hasParseException()) {
                this.parseExceptionHandler.handle(add.getParseException());
            }
            int i = rowCount - numRowsInMemory;
            this.rowsCurrentlyInMemory += i;
            this.bytesCurrentlyInMemory += bytesInMemory2 - bytesInMemory;
            this.totalRows += i;
            this.sinksMetadata.computeIfAbsent(segmentIdWithShardSpec, segmentIdWithShardSpec2 -> {
                return new SinkMetadata();
            }).addRows(i);
            boolean z2 = false;
            ArrayList arrayList = new ArrayList();
            if (!orCreateSink.canAppendRow()) {
                z2 = true;
                arrayList.add("No more rows can be appended to sink");
            }
            if (this.rowsCurrentlyInMemory >= this.tuningConfig.getMaxRowsInMemory()) {
                z2 = true;
                arrayList.add(StringUtils.format("rowsCurrentlyInMemory[%d] is greater than maxRowsInMemory[%d]", Integer.valueOf(this.rowsCurrentlyInMemory), Integer.valueOf(this.tuningConfig.getMaxRowsInMemory())));
            }
            if (this.bytesCurrentlyInMemory >= this.maxBytesTuningConfig) {
                z2 = true;
                arrayList.add(StringUtils.format("bytesCurrentlyInMemory[%d] is greater than maxBytesInMemory[%d]", Long.valueOf(this.bytesCurrentlyInMemory), Long.valueOf(this.maxBytesTuningConfig)));
            }
            if (z2) {
                log.info("Incremental persist to disk because %s.", String.join(com.amazonaws.util.StringUtils.COMMA_SEPARATOR, arrayList));
                long j = 0;
                Iterator<Map.Entry<SegmentIdWithShardSpec, Sink>> it2 = this.sinks.entrySet().iterator();
                while (it2.hasNext()) {
                    Sink value = it2.next().getValue();
                    if (value != null) {
                        j += value.getBytesInMemory();
                        if (value.swappable()) {
                            this.bytesCurrentlyInMemory += calculateMemoryUsedByHydrant();
                        }
                    }
                }
                if (!this.skipBytesInMemoryOverheadCheck && this.bytesCurrentlyInMemory - j > this.maxBytesTuningConfig) {
                    String format = StringUtils.format("Task has exceeded safe estimated heap usage limits, failing (numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])(bytesCurrentlyInMemory: [%d] - bytesToBePersisted: [%d] > maxBytesTuningConfig: [%d])", Integer.valueOf(this.sinks.size()), Integer.valueOf(this.sinks.values().stream().mapToInt((v0) -> {
                        return Iterables.size(v0);
                    }).sum()), Integer.valueOf(getTotalRowCount()), Long.valueOf(this.bytesCurrentlyInMemory), Long.valueOf(j), Long.valueOf(this.maxBytesTuningConfig));
                    String format2 = StringUtils.format("%s.\nThis can occur when the overhead from too many intermediary segment persists becomes to great to have enough space to process additional input rows. This check, along with metering the overhead of these objects to factor into the 'maxBytesInMemory' computation, can be disabled by setting 'skipBytesInMemoryOverheadCheck' to 'true' (note that doing so might allow the task to naturally encounter a 'java.lang.OutOfMemoryError'). Alternatively, 'maxBytesInMemory' can be increased which will cause an increase in heap footprint, but will allow for more intermediary segment persists to occur before reaching this condition.", format);
                    log.makeAlert(format, new Object[0]).addData("dataSource", this.schema.getDataSource()).emit();
                    throw new RuntimeException(format2);
                }
                Futures.addCallback(persistAll(null), new FutureCallback<Object>() { // from class: org.apache.druid.segment.realtime.appenderator.BatchAppenderator.1
                    @Override // com.google.common.util.concurrent.FutureCallback
                    public void onSuccess(@Nullable Object obj) {
                    }

                    @Override // com.google.common.util.concurrent.FutureCallback
                    public void onFailure(Throwable th) {
                        BatchAppenderator.this.persistError = th;
                    }
                });
            }
            return new Appenderator.AppenderatorAddResult(segmentIdWithShardSpec, this.sinksMetadata.get(segmentIdWithShardSpec).numRowsInSegment, false);
        } catch (IndexSizeExceededException e) {
            log.error(e, "Sink for segment[%s] was unexpectedly full!", segmentIdWithShardSpec);
            throw e;
        }
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public List<SegmentIdWithShardSpec> getSegments() {
        return ImmutableList.copyOf((Collection) this.sinksMetadata.keySet());
    }

    @VisibleForTesting
    public List<SegmentIdWithShardSpec> getInMemorySegments() {
        return ImmutableList.copyOf((Collection) this.sinks.keySet());
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public int getRowCount(SegmentIdWithShardSpec segmentIdWithShardSpec) {
        return this.sinksMetadata.get(segmentIdWithShardSpec).getNumRowsInSegment();
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public int getTotalRowCount() {
        return this.totalRows;
    }

    @VisibleForTesting
    public int getRowsInMemory() {
        return this.rowsCurrentlyInMemory;
    }

    @VisibleForTesting
    public long getBytesCurrentlyInMemory() {
        return this.bytesCurrentlyInMemory;
    }

    @VisibleForTesting
    public long getBytesInMemory(SegmentIdWithShardSpec segmentIdWithShardSpec) {
        Sink sink = this.sinks.get(segmentIdWithShardSpec);
        if (sink == null) {
            return 0L;
        }
        return sink.getBytesInMemory();
    }

    private Sink getOrCreateSink(SegmentIdWithShardSpec segmentIdWithShardSpec) {
        Sink sink = this.sinks.get(segmentIdWithShardSpec);
        if (sink == null) {
            sink = new Sink(segmentIdWithShardSpec.getInterval(), this.schema, segmentIdWithShardSpec.getShardSpec(), segmentIdWithShardSpec.getVersion(), this.tuningConfig.getAppendableIndexSpec(), this.tuningConfig.getMaxRowsInMemory(), this.maxBytesTuningConfig, null);
            this.bytesCurrentlyInMemory += calculateSinkMemoryInUsed();
            this.sinks.put(segmentIdWithShardSpec, sink);
            this.metrics.setSinkCount(this.sinks.size());
        }
        return sink;
    }

    @Override // org.apache.druid.query.QuerySegmentWalker
    public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> iterable) {
        throw new UnsupportedOperationException("No query runner for batch appenderator");
    }

    @Override // org.apache.druid.query.QuerySegmentWalker
    public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> iterable) {
        throw new UnsupportedOperationException("No query runner for batch appenderator");
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public void clear() {
        throwPersistErrorIfExists();
        clear(this.sinks, true);
    }

    private void clear(Map<SegmentIdWithShardSpec, Sink> map, boolean z) {
        log.info("Clearing all[%d] sinks & their hydrants, removing data on disk: [%s]", Integer.valueOf(map.size()), Boolean.valueOf(z));
        Iterator<Map.Entry<SegmentIdWithShardSpec, Sink>> it2 = map.entrySet().iterator();
        it2.forEachRemaining(entry -> {
            clearSinkMemoryCountersAndDiskStoredData((SegmentIdWithShardSpec) entry.getKey(), (Sink) entry.getValue(), z);
            it2.remove();
        });
        this.metrics.setSinkCount(map.size());
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public ListenableFuture<?> drop(SegmentIdWithShardSpec segmentIdWithShardSpec) {
        Sink sink = this.sinks.get(segmentIdWithShardSpec);
        SinkMetadata remove = this.sinksMetadata.remove(segmentIdWithShardSpec);
        if (remove != null) {
            int totalRowCount = getTotalRowCount();
            int numRowsInSegment = remove.getNumRowsInSegment();
            int i = totalRowCount - numRowsInSegment;
            if (i < 0) {
                log.warn("Total rows[%d] after dropping segment[%s] rows [%d]", Integer.valueOf(i), segmentIdWithShardSpec, Integer.valueOf(numRowsInSegment));
            }
            this.totalRows = Math.max(i, 0);
        }
        if (sink != null) {
            clearSinkMemoryCountersAndDiskStoredData(segmentIdWithShardSpec, sink, true);
            if (this.sinks.remove(segmentIdWithShardSpec) == null) {
                log.warn("Sink for identifier[%s] not found, skipping", segmentIdWithShardSpec);
            }
        }
        return Futures.immediateFuture(null);
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public ListenableFuture<Object> persistAll(@Nullable Committer committer) {
        throwPersistErrorIfExists();
        if (committer != null) {
            throw new ISE("committer must be null for BatchAppenderator", new Object[0]);
        }
        Map<SegmentIdWithShardSpec, Sink> swapSinks = swapSinks();
        Stopwatch createStarted = Stopwatch.createStarted();
        ListenableFuture<Object> submit = this.persistExecutor.submit(() -> {
            log.info("Spawning intermediate persist", new Object[0]);
            ArrayList<Pair> arrayList = new ArrayList();
            int i = 0;
            long j = 0;
            int i2 = 0;
            long size = swapSinks.size();
            for (Map.Entry entry : swapSinks.entrySet()) {
                SegmentIdWithShardSpec segmentIdWithShardSpec = (SegmentIdWithShardSpec) entry.getKey();
                Sink sink = (Sink) entry.getValue();
                if (sink == null) {
                    throw new ISE("No sink for identifier: %s", segmentIdWithShardSpec);
                }
                int size2 = Lists.newArrayList(sink).size();
                if (size2 != 1) {
                    throw new ISE("There should be only one hydrant for identifier[%s] but there are[%s]", segmentIdWithShardSpec, Integer.valueOf(size2));
                }
                i2++;
                i += sink.getNumRowsInMemory();
                j += sink.getBytesInMemory();
                if (!sink.swappable()) {
                    throw new ISE("Sink is not swappable![%s]", segmentIdWithShardSpec);
                }
                arrayList.add(Pair.of(sink.swap(), segmentIdWithShardSpec));
            }
            if (arrayList.isEmpty()) {
                log.info("No indexes will be persisted", new Object[0]);
            }
            Stopwatch createStarted2 = Stopwatch.createStarted();
            try {
                try {
                    for (Pair pair : arrayList) {
                        this.metrics.incrementRowOutputCount(persistHydrant((FireHydrant) pair.lhs, (SegmentIdWithShardSpec) pair.rhs));
                    }
                    log.info("Persisted in-memory data for segments: %s", arrayList.stream().filter(pair2 -> {
                        return pair2.rhs != 0;
                    }).map(pair3 -> {
                        return ((SegmentIdWithShardSpec) pair3.rhs).asSegmentId().toString();
                    }).distinct().collect(Collectors.joining(", ")));
                    log.info("Persisted stats: processed rows: [%d], persisted rows[%d], persisted sinks: [%d], persisted fireHydrants (across sinks): [%d]", Long.valueOf(this.rowIngestionMeters.getProcessed()), Integer.valueOf(i), Long.valueOf(size), Integer.valueOf(i2));
                    this.metrics.incrementNumPersists();
                    long elapsed = createStarted2.elapsed(TimeUnit.MILLISECONDS);
                    this.metrics.incrementPersistTimeMillis(elapsed);
                    createStarted2.stop();
                    log.info("Persisted rows[%,d] and bytes[%,d] and removed all sinks & hydrants from memory in[%d] millis", Integer.valueOf(i), Long.valueOf(j), Long.valueOf(elapsed));
                    log.info("Persist is done.", new Object[0]);
                    return null;
                } catch (Exception e) {
                    this.metrics.incrementFailedPersists();
                    throw e;
                }
            } catch (Throwable th) {
                this.metrics.incrementNumPersists();
                long elapsed2 = createStarted2.elapsed(TimeUnit.MILLISECONDS);
                this.metrics.incrementPersistTimeMillis(elapsed2);
                createStarted2.stop();
                log.info("Persisted rows[%,d] and bytes[%,d] and removed all sinks & hydrants from memory in[%d] millis", Integer.valueOf(i), Long.valueOf(j), Long.valueOf(elapsed2));
                log.info("Persist is done.", new Object[0]);
                throw th;
            }
        });
        long elapsed = createStarted.elapsed(TimeUnit.MILLISECONDS);
        this.metrics.incrementPersistBackPressureMillis(elapsed);
        if (elapsed > 1000) {
            log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", Long.valueOf(elapsed));
        }
        createStarted.stop();
        return submit;
    }

    Map<SegmentIdWithShardSpec, Sink> swapSinks() {
        ImmutableMap copyOf = ImmutableMap.copyOf((Map) this.sinks);
        this.sinks.clear();
        resetSinkMetadata();
        return copyOf;
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public ListenableFuture<SegmentsAndCommitMetadata> push(Collection<SegmentIdWithShardSpec> collection, @Nullable Committer committer, boolean z) {
        if (committer != null) {
            throw new ISE("There should be no committer for batch ingestion", new Object[0]);
        }
        if (z) {
            throw new ISE("Batch ingestion does not require uniquePath", new Object[0]);
        }
        ArrayList arrayList = new ArrayList();
        return Futures.transform(persistAll(null), obj -> {
            log.info("Push started, processsing[%d] sinks", Integer.valueOf(collection.size()));
            int i = 0;
            Iterator it2 = collection.iterator();
            while (it2.hasNext()) {
                SegmentIdWithShardSpec segmentIdWithShardSpec = (SegmentIdWithShardSpec) it2.next();
                SinkMetadata sinkMetadata = this.sinksMetadata.get(segmentIdWithShardSpec);
                if (sinkMetadata == null) {
                    throw new ISE("No sink has been processed for identifier[%s]", segmentIdWithShardSpec);
                }
                File persistedFileDir = sinkMetadata.getPersistedFileDir();
                if (persistedFileDir == null) {
                    throw new ISE("Persisted directory for identifier[%s] is null in sink metadata", segmentIdWithShardSpec);
                }
                i += sinkMetadata.getNumHydrants();
                try {
                    DataSegment mergeAndPush = mergeAndPush(segmentIdWithShardSpec, getSinkForIdentifierPath(segmentIdWithShardSpec, persistedFileDir));
                    if (mergeAndPush != null) {
                        arrayList.add(mergeAndPush);
                    } else {
                        log.warn("mergeAndPush[%s] returned null, skipping.", segmentIdWithShardSpec);
                    }
                } catch (IOException e) {
                    throw new ISE(e, "Failed to retrieve sinks for identifier[%s]", segmentIdWithShardSpec);
                }
            }
            log.info("Push done: total sinks merged[%d], total hydrants merged[%d]", Integer.valueOf(collection.size()), Integer.valueOf(i));
            return new SegmentsAndCommitMetadata(arrayList, obj);
        }, this.pushExecutor);
    }

    @Nullable
    private DataSegment mergeAndPush(SegmentIdWithShardSpec segmentIdWithShardSpec, Sink sink) {
        RuntimeException rethrow;
        File file = new File(computePersistDir(segmentIdWithShardSpec), "merged");
        File computeDescriptorFile = computeDescriptorFile(segmentIdWithShardSpec);
        if (sink.isWritable()) {
            throw new ISE("Expected sink to be no longer writable before mergeAndPush for segment[%s].", segmentIdWithShardSpec);
        }
        int i = 0;
        Iterator<FireHydrant> it2 = sink.iterator();
        while (it2.hasNext()) {
            if (!it2.next().hasSwapped()) {
                throw new ISE("Expected sink to be fully persisted before mergeAndPush for segment[%s].", segmentIdWithShardSpec);
            }
            i++;
        }
        SinkMetadata sinkMetadata = this.sinksMetadata.get(segmentIdWithShardSpec);
        if (sinkMetadata == null) {
            log.warn("Sink metadata not found just before merge for identifier [%s]", segmentIdWithShardSpec);
        } else if (i != sinkMetadata.getNumHydrants()) {
            throw new ISE("Number of restored hydrants[%d] for identifier[%s] does not match expected value[%d]", Integer.valueOf(i), segmentIdWithShardSpec, Integer.valueOf(sinkMetadata.getNumHydrants()));
        }
        try {
            if (computeDescriptorFile.exists()) {
                log.info("Segment[%s] already pushed, skipping.", segmentIdWithShardSpec);
                return (DataSegment) this.objectMapper.readValue(computeDescriptorFile, DataSegment.class);
            }
            removeDirectory(file);
            if (file.exists()) {
                throw new ISE("Merged target[%s] exists after removing?!", file);
            }
            long nanoTime = System.nanoTime();
            ArrayList arrayList = new ArrayList();
            Closer create = Closer.create();
            try {
                try {
                    Iterator<FireHydrant> it3 = sink.iterator();
                    while (it3.hasNext()) {
                        FireHydrant next = it3.next();
                        Pair<ReferenceCountingSegment, Closeable> andIncrementSegment = next.getAndIncrementSegment();
                        QueryableIndex asQueryableIndex = andIncrementSegment.lhs.asQueryableIndex();
                        log.debug("Segment[%s] adding hydrant[%s]", segmentIdWithShardSpec, next);
                        arrayList.add(asQueryableIndex);
                        create.register(andIncrementSegment.rhs);
                    }
                    File mergeQueryableIndex = this.indexMerger.mergeQueryableIndex(arrayList, this.schema.getGranularitySpec().isRollup(), this.schema.getAggregators(), this.schema.getDimensionsSpec(), file, this.tuningConfig.getIndexSpec(), this.tuningConfig.getSegmentWriteOutMediumFactory(), this.tuningConfig.getMaxColumnsToMerge());
                    long nanoTime2 = System.nanoTime();
                    log.debug("Segment[%s] built in %,dms.", segmentIdWithShardSpec, Long.valueOf((nanoTime2 - nanoTime) / 1000000));
                    create.close();
                    DataSegment dataSegment = (DataSegment) RetryUtils.retry(() -> {
                        return this.dataSegmentPusher.push(mergeQueryableIndex, sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(arrayList, this.schema.getDimensionsSpec())), false);
                    }, th -> {
                        return th instanceof Exception;
                    }, 5);
                    Iterator<FireHydrant> it4 = sink.iterator();
                    while (it4.hasNext()) {
                        it4.next().swapSegment(null);
                    }
                    removeDirectory(computePersistDir(segmentIdWithShardSpec));
                    log.info("Segment[%s] of %,d bytes built from %d incremental persist(s) in %,dms; pushed to deep storage in %,dms. Load spec is: %s", segmentIdWithShardSpec, Long.valueOf(dataSegment.getSize()), Integer.valueOf(arrayList.size()), Long.valueOf((nanoTime2 - nanoTime) / 1000000), Long.valueOf((System.nanoTime() - nanoTime2) / 1000000), this.objectMapper.writeValueAsString(dataSegment.getLoadSpec()));
                    return dataSegment;
                } finally {
                }
            } catch (Throwable th2) {
                create.close();
                throw th2;
            }
        } catch (Exception e) {
            this.metrics.incrementFailedHandoffs();
            log.warn(e, "Failed to push merged index for segment[%s].", segmentIdWithShardSpec);
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            log.debug("Appenderator already closed, skipping close() call.", new Object[0]);
            return;
        }
        log.debug("Shutting down...", new Object[0]);
        try {
            log.debug("Shutdown & wait for persistExecutor", new Object[0]);
            if (this.persistExecutor != null) {
                this.persistExecutor.shutdown();
                if (!this.persistExecutor.awaitTermination(365L, TimeUnit.DAYS)) {
                    log.warn("persistExecutor not terminated", new Object[0]);
                }
                this.persistExecutor = null;
            }
            log.debug("Shutdown & wait for pushExecutor", new Object[0]);
            if (this.pushExecutor != null) {
                this.pushExecutor.shutdown();
                if (!this.pushExecutor.awaitTermination(365L, TimeUnit.DAYS)) {
                    log.warn("pushExecutor not terminated", new Object[0]);
                }
                this.pushExecutor = null;
            }
            log.debug("Waited for and shutdown executors...", new Object[0]);
            clear(this.sinks, false);
            unlockBasePersistDirectory();
            List<File> persistedidentifierPaths = getPersistedidentifierPaths();
            if (persistedidentifierPaths != null) {
                Iterator<File> it2 = persistedidentifierPaths.iterator();
                while (it2.hasNext()) {
                    removeDirectory(it2.next());
                }
            }
            this.totalRows = 0;
            this.sinksMetadata.clear();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ISE("Failed to wait & shutdown executors during close()", new Object[0]);
        }
    }

    @Override // org.apache.druid.segment.realtime.appenderator.Appenderator
    public void closeNow() {
        if (!this.closed.compareAndSet(false, true)) {
            log.debug("Appenderator already closed, skipping closeNow() call.", new Object[0]);
        } else {
            log.debug("Shutting down immediately...", new Object[0]);
            shutdownExecutors();
        }
    }

    private void lockBasePersistDirectory() {
        if (this.basePersistDirLock == null) {
            try {
                this.basePersistDirLockChannel = FileChannel.open(computeLockFile().toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
                this.basePersistDirLock = this.basePersistDirLockChannel.tryLock();
                if (this.basePersistDirLock == null) {
                    throw new ISE("Cannot acquire lock on basePersistDir: %s", computeLockFile());
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void unlockBasePersistDirectory() {
        try {
            if (this.basePersistDirLock != null) {
                this.basePersistDirLock.release();
                this.basePersistDirLockChannel.close();
                this.basePersistDirLock = null;
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    @Nullable
    public List<File> getPersistedidentifierPaths() {
        File[] listFiles;
        ArrayList arrayList = new ArrayList();
        File basePersistDirectory = this.tuningConfig.getBasePersistDirectory();
        if (!basePersistDirectory.exists() || (listFiles = basePersistDirectory.listFiles()) == null) {
            return null;
        }
        for (File file : listFiles) {
            if (new File(file, IDENTIFIER_FILE_NAME).isFile()) {
                arrayList.add(file);
            }
        }
        return arrayList;
    }

    private Sink getSinkForIdentifierPath(SegmentIdWithShardSpec segmentIdWithShardSpec, File file) throws IOException {
        File[] listFiles = file.listFiles((file2, str) -> {
            return Ints.tryParse(str) != null;
        });
        if (listFiles == null) {
            throw new ISE("Problem reading persisted sinks in path[%s]", file);
        }
        Arrays.sort(listFiles, (file3, file4) -> {
            return Ints.compare(Integer.parseInt(file3.getName()), Integer.parseInt(file4.getName()));
        });
        ArrayList arrayList = new ArrayList();
        for (File file5 : listFiles) {
            int parseInt = Integer.parseInt(file5.getName());
            log.debug("Loading previously persisted partial segment at [%s]", file5);
            if (parseInt != arrayList.size()) {
                throw new ISE("Missing hydrant [%,d] in identifier [%s].", Integer.valueOf(arrayList.size()), segmentIdWithShardSpec);
            }
            arrayList.add(new FireHydrant(new QueryableIndexSegment(this.indexIO.loadIndex(file5), segmentIdWithShardSpec.asSegmentId()), parseInt));
        }
        Sink sink = new Sink(segmentIdWithShardSpec.getInterval(), this.schema, segmentIdWithShardSpec.getShardSpec(), segmentIdWithShardSpec.getVersion(), this.tuningConfig.getAppendableIndexSpec(), this.tuningConfig.getMaxRowsInMemory(), this.maxBytesTuningConfig, null, arrayList);
        sink.finishWriting();
        return sink;
    }

    private void resetSinkMetadata() {
        this.rowsCurrentlyInMemory = 0;
        this.bytesCurrentlyInMemory = 0L;
        this.metrics.setSinkCount(0L);
    }

    private void clearSinkMemoryCountersAndDiskStoredData(SegmentIdWithShardSpec segmentIdWithShardSpec, Sink sink, boolean z) {
        if (sink.finishWriting()) {
            this.rowsCurrentlyInMemory -= sink.getNumRowsInMemory();
            this.bytesCurrentlyInMemory -= sink.getBytesInMemory();
            this.bytesCurrentlyInMemory -= calculateSinkMemoryInUsed();
            Iterator<FireHydrant> it2 = sink.iterator();
            while (it2.hasNext()) {
                if (!it2.next().equals(sink.getCurrHydrant())) {
                    this.bytesCurrentlyInMemory -= calculateMemoryUsedByHydrant();
                }
            }
        }
        if (z) {
            removeDirectory(computePersistDir(segmentIdWithShardSpec));
        }
        log.info("Removed sink for segment[%s].", segmentIdWithShardSpec);
    }

    private File computeLockFile() {
        return new File(this.tuningConfig.getBasePersistDirectory(), ".lock");
    }

    private File computePersistDir(SegmentIdWithShardSpec segmentIdWithShardSpec) {
        return new File(this.tuningConfig.getBasePersistDirectory(), segmentIdWithShardSpec.toString());
    }

    private File computeIdentifierFile(SegmentIdWithShardSpec segmentIdWithShardSpec) {
        return new File(computePersistDir(segmentIdWithShardSpec), IDENTIFIER_FILE_NAME);
    }

    private File computeDescriptorFile(SegmentIdWithShardSpec segmentIdWithShardSpec) {
        return new File(computePersistDir(segmentIdWithShardSpec), "descriptor.json");
    }

    private File createPersistDirIfNeeded(SegmentIdWithShardSpec segmentIdWithShardSpec) throws IOException {
        File computePersistDir = computePersistDir(segmentIdWithShardSpec);
        FileUtils.forceMkdir(computePersistDir);
        this.objectMapper.writeValue(computeIdentifierFile(segmentIdWithShardSpec), segmentIdWithShardSpec);
        return computePersistDir;
    }

    private int persistHydrant(FireHydrant fireHydrant, SegmentIdWithShardSpec segmentIdWithShardSpec) {
        if (fireHydrant.hasSwapped()) {
            throw new ISE("Segment[%s] hydrant[%s] already swapped. This cannot happen.", segmentIdWithShardSpec, fireHydrant);
        }
        log.debug("Segment[%s], persisting Hydrant[%s]", segmentIdWithShardSpec, fireHydrant);
        try {
            long nanoTime = System.nanoTime();
            int size = fireHydrant.getIndex().size();
            SinkMetadata sinkMetadata = this.sinksMetadata.get(segmentIdWithShardSpec);
            if (sinkMetadata == null) {
                throw new ISE("Sink must not be null for identifier when persisting hydrant[%s]", segmentIdWithShardSpec);
            }
            File createPersistDirIfNeeded = createPersistDirIfNeeded(segmentIdWithShardSpec);
            this.indexMerger.persist(fireHydrant.getIndex(), segmentIdWithShardSpec.getInterval(), new File(createPersistDirIfNeeded, String.valueOf(sinkMetadata.getNumHydrants())), this.tuningConfig.getIndexSpecForIntermediatePersists(), this.tuningConfig.getSegmentWriteOutMediumFactory());
            sinkMetadata.setPersistedFileDir(createPersistDirIfNeeded);
            log.info("About to persist in-memory data for segment[%s] spill[%s] to disk in [%,d] ms (%,d rows).", fireHydrant.getSegmentId(), Integer.valueOf(fireHydrant.getCount()), Long.valueOf((System.nanoTime() - nanoTime) / 1000000), Integer.valueOf(size));
            fireHydrant.swapSegment(null);
            sinkMetadata.addHydrants(1);
            return size;
        } catch (IOException e) {
            log.makeAlert("Incremental persist failed", new Object[0]).addData("segment", segmentIdWithShardSpec.toString()).addData("dataSource", this.schema.getDataSource()).addData(RowLock.DIAG_COUNT, Integer.valueOf(fireHydrant.getCount())).emit();
            throw new RuntimeException(e);
        }
    }

    private void removeDirectory(File file) {
        if (file.exists()) {
            try {
                org.apache.druid.java.util.common.FileUtils.deleteDirectory(file);
                log.info("Removed directory [%s]", file);
            } catch (Exception e) {
                log.makeAlert(e, "Failed to remove directory[%s]", this.schema.getDataSource()).addData("file", file).emit();
            }
        }
    }

    private int calculateMemoryUsedByHydrant() {
        return this.skipBytesInMemoryOverheadCheck ? 0 : 1012;
    }

    private int calculateSinkMemoryInUsed() {
        return this.skipBytesInMemoryOverheadCheck ? 0 : 5000;
    }
}
