package org.janusgraph.diskstorage.keycolumnvalue.scan;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.Entry;
import org.janusgraph.diskstorage.EntryList;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.TemporaryBackendException;
import org.janusgraph.diskstorage.configuration.Configuration;
import org.janusgraph.diskstorage.keycolumnvalue.KCVSUtil;
import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore;
import org.janusgraph.diskstorage.keycolumnvalue.KeyIterator;
import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery;
import org.janusgraph.diskstorage.keycolumnvalue.StoreFeatures;
import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction;
import org.janusgraph.diskstorage.keycolumnvalue.scan.StandardScannerExecutor;
import org.janusgraph.diskstorage.util.EntryArrayList;
import org.janusgraph.diskstorage.util.RecordIterator;
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/janusgraph-core-0.6.0.jar:org/janusgraph/diskstorage/keycolumnvalue/scan/MultiThreadsRowsCollector.class */
class MultiThreadsRowsCollector extends RowsCollector {
    private static final int MAX_KEY_LENGTH = 128;
    private static final Logger log;
    private final StoreFeatures storeFeatures;
    private final StoreTransaction storeTx;
    private final List<SliceQuery> queries;
    private final Predicate<StaticBuffer> keyFilter;
    private final Configuration graphConfiguration;
    private final DataPuller[] pullThreads;
    private final BlockingQueue<SliceResult>[] dataQueues;
    private boolean interrupted;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/janusgraph-core-0.6.0.jar:org/janusgraph/diskstorage/keycolumnvalue/scan/MultiThreadsRowsCollector$DataPuller.class */
    public static class DataPuller extends Thread {
        private final BlockingQueue<SliceResult> queue;
        private final KeyIterator keyIterator;
        private final SliceQuery query;
        private final Predicate<StaticBuffer> keyFilter;
        private volatile boolean finished;

        private DataPuller(SliceQuery sliceQuery, BlockingQueue<SliceResult> blockingQueue, KeyIterator keyIterator, Predicate<StaticBuffer> predicate) {
            this.query = sliceQuery;
            this.queue = blockingQueue;
            this.keyIterator = keyIterator;
            this.keyFilter = predicate;
            this.finished = false;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.keyIterator.hasNext()) {
                try {
                    try {
                        StaticBuffer staticBuffer = (StaticBuffer) this.keyIterator.next();
                        RecordIterator<Entry> entries = this.keyIterator.getEntries();
                        if (this.keyFilter.test(staticBuffer)) {
                            this.queue.put(new SliceResult(this.query, staticBuffer, EntryArrayList.of((Iterator<? extends Entry>) entries)));
                        }
                    } catch (Throwable th) {
                        try {
                            this.keyIterator.close();
                        } catch (IOException e) {
                            MultiThreadsRowsCollector.log.warn("Could not close storage iterator ", (Throwable) e);
                        }
                        this.finished = true;
                        throw th;
                    }
                } catch (InterruptedException e2) {
                    MultiThreadsRowsCollector.log.error("Data-pulling thread interrupted while waiting on queue or data", (Throwable) e2);
                    try {
                        this.keyIterator.close();
                    } catch (IOException e3) {
                        MultiThreadsRowsCollector.log.warn("Could not close storage iterator ", (Throwable) e3);
                    }
                    this.finished = true;
                    return;
                } catch (Throwable th2) {
                    MultiThreadsRowsCollector.log.error("Could not load data from storage", th2);
                    try {
                        this.keyIterator.close();
                    } catch (IOException e4) {
                        MultiThreadsRowsCollector.log.warn("Could not close storage iterator ", (Throwable) e4);
                    }
                    this.finished = true;
                    return;
                }
            }
            try {
                this.keyIterator.close();
            } catch (IOException e5) {
                MultiThreadsRowsCollector.log.warn("Could not close storage iterator ", (Throwable) e5);
            }
            this.finished = true;
        }

        public boolean isFinished() {
            return this.finished;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/janusgraph-core-0.6.0.jar:org/janusgraph/diskstorage/keycolumnvalue/scan/MultiThreadsRowsCollector$SliceResult.class */
    public static class SliceResult {
        final SliceQuery query;
        final StaticBuffer key;
        final EntryList entries;

        private SliceResult(SliceQuery sliceQuery, StaticBuffer staticBuffer, EntryList entryList) {
            this.query = sliceQuery;
            this.key = staticBuffer;
            this.entries = entryList;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MultiThreadsRowsCollector(KeyColumnValueStore keyColumnValueStore, StoreFeatures storeFeatures, StoreTransaction storeTransaction, List<SliceQuery> list, Predicate<StaticBuffer> predicate, BlockingQueue<StandardScannerExecutor.Row> blockingQueue, Configuration configuration) throws BackendException {
        super(keyColumnValueStore, blockingQueue);
        this.interrupted = false;
        this.storeFeatures = storeFeatures;
        this.storeTx = storeTransaction;
        this.queries = list;
        this.keyFilter = predicate;
        this.graphConfiguration = configuration;
        this.dataQueues = new BlockingQueue[list.size()];
        this.pullThreads = new DataPuller[list.size()];
        setUp(list);
    }

    private void setUp(List<SliceQuery> list) throws BackendException {
        int i = 0;
        Iterator<SliceQuery> it = list.iterator();
        while (it.hasNext()) {
            addDataPuller(it.next(), this.storeTx, i);
            i++;
        }
    }

    @Override // org.janusgraph.diskstorage.keycolumnvalue.scan.RowsCollector
    void run() throws InterruptedException, TemporaryBackendException {
        int size = this.queries.size();
        SliceResult[] sliceResultArr = new SliceResult[size];
        while (!this.interrupted) {
            collectDataFromPullers(sliceResultArr, size);
            SliceResult sliceResult = sliceResultArr[0];
            if (sliceResult == null) {
                return;
            }
            this.rowQueue.put(buildRow(size, sliceResultArr, sliceResult.key));
        }
    }

    private void collectDataFromPullers(SliceResult[] sliceResultArr, int i) throws InterruptedException, TemporaryBackendException {
        for (int i2 = 0; i2 < i; i2++) {
            if (sliceResultArr[i2] == null) {
                BlockingQueue<SliceResult> blockingQueue = this.dataQueues[i2];
                SliceResult poll = blockingQueue.poll(10L, TimeUnit.MILLISECONDS);
                if (poll == null) {
                    DataPuller dataPuller = this.pullThreads[i2];
                    if (dataPuller.isFinished()) {
                        continue;
                    } else {
                        while (!dataPuller.isFinished() && poll == null) {
                            poll = blockingQueue.poll(10L, TimeUnit.MILLISECONDS);
                        }
                        if (poll == null && !dataPuller.isFinished()) {
                            throw new TemporaryBackendException("Timed out waiting for next row data - storage error likely");
                        }
                    }
                }
                sliceResultArr[i2] = poll;
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v20, types: [org.janusgraph.diskstorage.EntryList] */
    private StandardScannerExecutor.Row buildRow(int i, SliceResult[] sliceResultArr, StaticBuffer staticBuffer) {
        HashMap hashMap = new HashMap(i);
        for (int i2 = 0; i2 < sliceResultArr.length; i2++) {
            SliceQuery sliceQuery = this.queries.get(i2);
            EntryList.EmptyList emptyList = EntryList.EMPTY_LIST;
            if (sliceResultArr[i2] != null && sliceResultArr[i2].key.equals(staticBuffer)) {
                if (!$assertionsDisabled && !sliceQuery.equals(sliceResultArr[i2].query)) {
                    throw new AssertionError();
                }
                emptyList = sliceResultArr[i2].entries;
                sliceResultArr[i2] = null;
            }
            hashMap.put(sliceQuery, emptyList);
        }
        return new StandardScannerExecutor.Row(staticBuffer, hashMap);
    }

    @Override // org.janusgraph.diskstorage.keycolumnvalue.scan.RowsCollector
    void join() throws InterruptedException {
        int i = 0;
        for (DataPuller dataPuller : this.pullThreads) {
            dataPuller.join(10L);
            if (dataPuller.isAlive()) {
                log.warn("Data pulling thread [{}] did not terminate. Forcing termination", Integer.valueOf(i));
                if (this.storeFeatures.supportsInterruption()) {
                    dataPuller.interrupt();
                } else {
                    log.warn("Store does not support interruption, so data pulling thread [{}] cannot be interrupted", Integer.valueOf(i));
                    dataPuller.finished = true;
                }
            }
            i++;
        }
    }

    @Override // org.janusgraph.diskstorage.keycolumnvalue.scan.RowsCollector
    void interrupt() {
        this.interrupted = true;
    }

    @Override // org.janusgraph.diskstorage.keycolumnvalue.scan.RowsCollector
    void cleanup() {
        if (this.pullThreads != null) {
            for (DataPuller dataPuller : this.pullThreads) {
                if (dataPuller.isAlive()) {
                    if (this.storeFeatures.supportsInterruption()) {
                        dataPuller.interrupt();
                    } else {
                        log.warn("Store does not support interruption, so data pulling thread cannot be interrupted");
                        dataPuller.finished = true;
                    }
                }
            }
        }
    }

    private void addDataPuller(SliceQuery sliceQuery, StoreTransaction storeTransaction, int i) throws BackendException {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(((Integer) this.graphConfiguration.get(GraphDatabaseConfiguration.PAGE_SIZE, new String[0])).intValue());
        this.dataQueues[i] = linkedBlockingQueue;
        DataPuller dataPuller = new DataPuller(sliceQuery, linkedBlockingQueue, KCVSUtil.getKeys(this.store, sliceQuery, this.storeFeatures, 128, storeTransaction), this.keyFilter);
        this.pullThreads[i] = dataPuller;
        dataPuller.setName("data-puller-" + i);
        dataPuller.start();
    }

    static {
        $assertionsDisabled = !MultiThreadsRowsCollector.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger((Class<?>) MultiThreadsRowsCollector.class);
    }
}
