package org.apache.fluo.core.worker.finder.hash;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.fluo.accumulo.iterators.NotificationHashFilter;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.util.UtilWaitThread;
import org.apache.fluo.core.worker.NotificationFinder;
import org.apache.fluo.core.worker.NotificationProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/fluo/core/worker/finder/hash/ScanTask.class */
public class ScanTask implements Runnable {
    private final NotificationFinder finder;
    private final PartitionManager partitionManager;
    private final NotificationProcessor proccessor;
    private final AtomicBoolean stopped;
    private final Environment env;
    private long minSleepTime;
    private long maxSleepTime;
    private static final Logger log = LoggerFactory.getLogger(ScanTask.class);
    private static final Map<String, String> SCAN_EXEC_HINTS = Collections.singletonMap("scan_type", "fluo-ntfy");
    private final Random rand = new Random();
    private final Map<TableRange, TabletData> rangeData = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/fluo/core/worker/finder/hash/ScanTask$ScanCounts.class */
    public static class ScanCounts {
        int seen;
        int added;

        private ScanCounts() {
            this.seen = 0;
            this.added = 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ScanTask(NotificationFinder notificationFinder, NotificationProcessor notificationProcessor, PartitionManager partitionManager, Environment environment, AtomicBoolean atomicBoolean, long j, long j2) {
        this.finder = notificationFinder;
        this.env = environment;
        this.stopped = atomicBoolean;
        this.proccessor = notificationProcessor;
        this.partitionManager = partitionManager;
        this.minSleepTime = j;
        this.maxSleepTime = j2;
    }

    @Override // java.lang.Runnable
    public void run() {
        ArrayList<TableRange> arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        int size = this.proccessor.size();
        while (!this.stopped.get()) {
            try {
                arrayList.clear();
                hashSet.clear();
                PartitionInfo waitForPartitionInfo = this.partitionManager.waitForPartitionInfo();
                while (this.proccessor.size() > size / 2 && !this.stopped.get()) {
                    UtilWaitThread.sleep(50L, this.stopped);
                }
                waitForPartitionInfo.getMyGroupsRanges().forEach(tableRange -> {
                    arrayList.add(tableRange);
                    hashSet.add(tableRange);
                });
                Collections.shuffle(arrayList, this.rand);
                this.rangeData.keySet().retainAll(hashSet);
                long currentTimeMillis = this.maxSleepTime + System.currentTimeMillis();
                ScanCounts scanCounts = new ScanCounts();
                int i = 0;
                try {
                    for (TableRange tableRange2 : arrayList) {
                        TabletData computeIfAbsent = this.rangeData.computeIfAbsent(tableRange2, tableRange3 -> {
                            return new TabletData();
                        });
                        if (System.currentTimeMillis() >= computeIfAbsent.retryTime) {
                            if (!waitForPartitionInfo.equals(this.partitionManager.getPartitionInfo())) {
                                break;
                            }
                            NotificationProcessor.Session beginAddingNotifications = this.proccessor.beginAddingNotifications(rowColumn -> {
                                return tableRange2.contains(rowColumn.getRow());
                            });
                            try {
                                this.env.getSharedResources().getBatchWriter().waitForAsyncFlush();
                                ScanCounts scan = scan(beginAddingNotifications, waitForPartitionInfo, tableRange2.getRange());
                                i++;
                                if (beginAddingNotifications != null) {
                                    beginAddingNotifications.close();
                                }
                                computeIfAbsent.updateScanCount(scan.added, this.maxSleepTime);
                                scanCounts.added += scan.added;
                                scanCounts.seen += scan.seen;
                                if (this.stopped.get()) {
                                    break;
                                }
                            } catch (Throwable th) {
                                if (beginAddingNotifications != null) {
                                    try {
                                        beginAddingNotifications.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                }
                                throw th;
                                break;
                            }
                        }
                        currentTimeMillis = Math.min(computeIfAbsent.retryTime, currentTimeMillis);
                    }
                } catch (PartitionInfoChangedException e) {
                }
                long max = !waitForPartitionInfo.equals(this.partitionManager.getPartitionInfo()) ? this.minSleepTime : Math.max(this.minSleepTime, currentTimeMillis - System.currentTimeMillis());
                size = this.proccessor.size();
                log.debug("Scanned {} of {} tablets. Notifications added: {} seen: {} queued: {}", new Object[]{Integer.valueOf(i), Integer.valueOf(arrayList.size()), Integer.valueOf(scanCounts.added), Integer.valueOf(scanCounts.seen), Integer.valueOf(size)});
                if (!this.stopped.get()) {
                    UtilWaitThread.sleep(max, this.stopped);
                }
            } catch (Exception e2) {
                if (isInterruptedException(e2)) {
                    log.debug("Error while looking for notifications", e2);
                } else {
                    log.error("Error while looking for notifications", e2);
                }
            }
        }
    }

    private boolean isInterruptedException(Exception exc) {
        boolean z = false;
        Throwable th = exc;
        while (true) {
            Throwable th2 = th;
            if (th2 == null) {
                return z;
            }
            if (th2 instanceof InterruptedException) {
                z = true;
            }
            th = th2.getCause();
        }
    }

    private ScanCounts scan(NotificationProcessor.Session session, PartitionInfo partitionInfo, Range range) throws TableNotFoundException {
        Scanner<Map.Entry> createScanner = this.env.getAccumuloClient().createScanner(this.env.getTable(), this.env.getAuthorizations());
        try {
            createScanner.setRange(range);
            Notification.configureScanner(createScanner);
            IteratorSetting iteratorSetting = new IteratorSetting(30, "nhf", NotificationHashFilter.class);
            NotificationHashFilter.setModulusParams(iteratorSetting, partitionInfo.getMyGroupSize(), partitionInfo.getMyIdInGroup());
            createScanner.addScanIterator(iteratorSetting);
            createScanner.setExecutionHints(SCAN_EXEC_HINTS);
            ScanCounts scanCounts = new ScanCounts();
            for (Map.Entry entry : createScanner) {
                if (!partitionInfo.equals(this.partitionManager.getPartitionInfo())) {
                    throw new PartitionInfoChangedException();
                }
                if (this.stopped.get()) {
                    if (createScanner != null) {
                        createScanner.close();
                    }
                    return scanCounts;
                }
                scanCounts.seen++;
                if (session.addNotification(this.finder, Notification.from((Key) entry.getKey()))) {
                    scanCounts.added++;
                }
            }
            if (createScanner != null) {
                createScanner.close();
            }
            return scanCounts;
        } catch (Throwable th) {
            if (createScanner != null) {
                try {
                    createScanner.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
