package org.apache.fluo.core.worker.finder.hash;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
import org.apache.curator.utils.ZKPaths;
import org.apache.fluo.accumulo.iterators.NotificationHashFilter;
import org.apache.fluo.accumulo.util.NotificationUtil;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.util.ByteUtil;
import org.apache.fluo.core.util.UtilWaitThread;
import org.apache.fluo.core.worker.NotificationFinder;
import org.apache.fluo.core.worker.NotificationProcessor;
import org.apache.fluo.core.worker.TxResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/fluo/core/worker/finder/hash/HashNotificationFinder.class */
public class HashNotificationFinder implements NotificationFinder {
    private NotificationProcessor notificationProcessor;
    private CuratorFramework curator;
    private ModulusParams modParams;
    private Environment env;
    private Thread scanThread;
    private PathChildrenCache childrenCache;
    private PersistentEphemeralNode myESNode;
    private static final Logger log = LoggerFactory.getLogger(HashNotificationFinder.class);
    private List<String> finders = Collections.emptyList();
    private int updates = 0;
    private AtomicBoolean stopped = new AtomicBoolean(false);

    /* renamed from: org.apache.fluo.core.worker.finder.hash.HashNotificationFinder$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/fluo/core/worker/finder/hash/HashNotificationFinder$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type = new int[PathChildrenCacheEvent.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_ADDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_REMOVED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_UPDATED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/apache/fluo/core/worker/finder/hash/HashNotificationFinder$FindersListener.class */
    private class FindersListener implements PathChildrenCacheListener {
        private FindersListener() {
        }

        public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
            switch (AnonymousClass1.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                case 1:
                case 2:
                    if (HashNotificationFinder.this.stopped.get()) {
                        return;
                    }
                    HashNotificationFinder.this.updateFinders();
                    return;
                case 3:
                    HashNotificationFinder.log.warn("unexpected event " + pathChildrenCacheEvent);
                    return;
                default:
                    return;
            }
        }

        /* synthetic */ FindersListener(HashNotificationFinder hashNotificationFinder, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* loaded from: input_file:org/apache/fluo/core/worker/finder/hash/HashNotificationFinder$ModParamsChangedException.class */
    static class ModParamsChangedException extends RuntimeException {
        private static final long serialVersionUID = 1;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void updateFinders() {
        String str;
        String actualPath = this.myESNode.getActualPath();
        while (true) {
            str = actualPath;
            if (str != null) {
                break;
            }
            UtilWaitThread.sleep(100L);
            actualPath = this.myESNode.getActualPath();
        }
        String nodeFromPath = ZKPaths.getNodeFromPath(str);
        ArrayList arrayList = new ArrayList();
        Iterator it = this.childrenCache.getCurrentData().iterator();
        while (it.hasNext()) {
            arrayList.add(ZKPaths.getNodeFromPath(((ChildData) it.next()).getPath()));
        }
        Collections.sort(arrayList);
        if (this.finders.equals(arrayList)) {
            return;
        }
        if (arrayList.indexOf(nodeFromPath) == -1) {
            this.modParams = null;
            this.finders = Collections.emptyList();
            log.debug("Did not find self in list of finders " + nodeFromPath);
        } else {
            this.updates++;
            this.modParams = new ModulusParams(arrayList.indexOf(nodeFromPath), arrayList.size(), this.updates);
            this.finders = arrayList;
            log.debug("updated modulus params " + this.modParams.remainder + " " + this.modParams.divisor);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized ModulusParams getModulusParams() {
        return this.modParams;
    }

    @Override // org.apache.fluo.core.worker.NotificationFinder
    public void init(Environment environment, NotificationProcessor notificationProcessor) {
        Preconditions.checkState(this.notificationProcessor == null);
        this.notificationProcessor = notificationProcessor;
        this.env = environment;
        this.curator = environment.getSharedResources().getCurator();
        try {
            this.myESNode = new PersistentEphemeralNode(this.curator, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, "/finders/f-", new byte[0]);
            this.myESNode.start();
            this.myESNode.waitForInitialCreate(1L, TimeUnit.MINUTES);
            this.childrenCache = new PathChildrenCache(environment.getSharedResources().getCurator(), "/finders", false);
            this.childrenCache.getListenable().addListener(new FindersListener(this, null));
            this.childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
            updateFinders();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.fluo.core.worker.NotificationFinder
    public void start() {
        this.scanThread = new Thread(new ScanTask(this, this.env, this.stopped));
        this.scanThread.setName(getClass().getSimpleName() + " " + ScanTask.class.getSimpleName());
        this.scanThread.setDaemon(true);
        this.scanThread.start();
    }

    @Override // org.apache.fluo.core.worker.NotificationFinder
    public void stop() {
        this.stopped.set(true);
        try {
            this.childrenCache.close();
        } catch (IOException e) {
            log.warn("Failed to close children cache", e);
        }
        try {
            this.myESNode.close();
        } catch (IOException e2) {
            log.warn("Failed to close ephemeral node", e2);
        }
        this.scanThread.interrupt();
        try {
            this.scanThread.join();
        } catch (InterruptedException e3) {
            throw new RuntimeException(e3);
        }
    }

    @Override // org.apache.fluo.core.worker.NotificationFinder
    public void failedToProcess(Notification notification, TxResult txResult) {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NotificationProcessor getWorkerQueue() {
        return this.notificationProcessor;
    }

    @VisibleForTesting
    static boolean shouldProcess(Notification notification, int i, int i2) {
        return NotificationHashFilter.accept(ByteUtil.toByteSequence(notification.getRow()), new ArrayByteSequence(NotificationUtil.encodeCol(notification.getColumn())), i, i2);
    }

    @Override // org.apache.fluo.core.worker.NotificationFinder
    public boolean shouldProcess(Notification notification) {
        ModulusParams modulusParams = getModulusParams();
        return shouldProcess(notification, modulusParams.divisor, modulusParams.remainder);
    }
}
