/*
 * Decompiled with CFR 0.152.
 */
package jmind.core.dubbo.callback;

import com.alibaba.dubbo.rpc.RpcException;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import jmind.core.dubbo.callback.Callback;
import jmind.core.dubbo.callback.IBusService;
import jmind.core.dubbo.pojo.BusEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BusServiceImpl
implements IBusService {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final BlockingQueue<BusEvent> bq = new LinkedBlockingQueue<BusEvent>();
    private final Multimap<String, Callback> listeners = HashMultimap.create();

    public BusServiceImpl() {
        System.out.println("init BusServiceImpl");
        Thread t = new Thread(new Runnable(){

            @Override
            public void run() {
                while (true) {
                    try {
                        block5: while (true) {
                            BusEvent event = (BusEvent)BusServiceImpl.this.bq.take();
                            Collection collection = BusServiceImpl.this.listeners.get((Object)event.getTopic());
                            if (collection.size() > 0) {
                                Iterator it = collection.iterator();
                                while (true) {
                                    if (!it.hasNext()) continue block5;
                                    try {
                                        ((Callback)it.next()).doIt(event);
                                    }
                                    catch (RpcException e) {
                                        BusServiceImpl.this.logger.error("doit event=", (Throwable)e);
                                        it.remove();
                                    }
                                }
                            }
                            BusServiceImpl.this.bq.add(event);
                        }
                    }
                    catch (InterruptedException e) {
                        BusServiceImpl.this.logger.error("", (Throwable)e);
                        continue;
                    }
                    break;
                }
            }
        });
        t.setDaemon(true);
        t.start();
    }

    @Override
    public void notify(BusEvent event) {
        this.logger.debug("event=" + event);
        this.bq.offer(event);
    }

    @Override
    public void subscribe(String topic, boolean all, Callback callback) {
        this.logger.debug(topic + "====" + all);
        if (all || !this.listeners.containsKey((Object)topic)) {
            this.listeners.put((Object)topic, (Object)callback);
        }
    }
}

