/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.frontend;

import java.io.Serializable;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.frontend.FrontEndSubscription;
import org.nustaq.kontraktor.services.rlclient.DataClient;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.reallive.api.ChangeMessage;
import org.nustaq.reallive.api.ChangeReceiver;
import org.nustaq.reallive.api.ChangeStream;
import org.nustaq.reallive.api.RealLiveTable;
import org.nustaq.reallive.api.Subscriber;
import org.nustaq.reallive.messages.AddMessage;
import org.nustaq.reallive.messages.QueryDoneMessage;
import org.nustaq.reallive.query.QParseException;

public class SubsRegistry {
    AtomicInteger subsIdCount = new AtomicInteger(1);
    HashMap<Integer, FrontEndSubscription> subsMap = new HashMap();
    DataClient dclient;
    boolean terminated = false;
    Function<String, ChangeStream> tableFactory = tableName -> this.dclient.tbl((String)tableName);

    public SubsRegistry(DataClient dclient) {
        this.dclient = dclient;
    }

    public void query(String tableName, String filter, String[] reducedFields, Callback<ChangeMessage> changeReceiver) {
        RealLiveTable table;
        if (filter == null || filter.trim().length() == 0 || filter.trim().equals("true")) {
            filter = "1";
        }
        if ((table = (RealLiveTable)this.tableFactory.apply(tableName)) == null) {
            changeReceiver.complete(null, (Object)"unknown table");
        } else {
            try {
                table.query(filter, (Callback & Serializable)(record, error) -> {
                    if (record != null) {
                        if (reducedFields != null && reducedFields.length > 0) {
                            record = record.reduced(reducedFields);
                        }
                        changeReceiver.pipe((Object)new AddMessage(0, record));
                    } else if (Actors.isError((Object)error)) {
                        changeReceiver.reject(error);
                    } else {
                        changeReceiver.resolve((Object)new QueryDoneMessage());
                    }
                });
            }
            catch (QParseException e) {
                changeReceiver.reject((Object)("ParseException:" + e.getMessage()));
                Log.Error((Object)this, (Throwable)e);
                return;
            }
        }
    }

    public void subscribe(int subsId, String tableName, String filter, String[] reducedFields, Callback<ChangeMessage> changeReceiver) {
        ChangeStream table;
        if (filter == null || filter.trim().length() == 0 || filter.trim().equals("true")) {
            filter = "1";
        }
        if ((table = this.tableFactory.apply(tableName)) == null) {
            changeReceiver.complete(null, (Object)"unknown table");
        } else {
            Subscriber subscriber;
            try {
                subscriber = table.subscribeOn(filter, (ChangeReceiver & Serializable)change -> {
                    if (reducedFields != null && reducedFields.length > 0) {
                        change = change.reduced(reducedFields);
                    }
                    changeReceiver.pipe((Object)change);
                });
            }
            catch (QParseException e) {
                changeReceiver.reject((Object)("ParseException:" + e.getMessage()));
                Log.Error((Object)this, (Throwable)e);
                return;
            }
            this.subsMap.put(subsId, new FrontEndSubscription(subscriber, tableName, changeReceiver));
        }
    }

    public void unsubscribe(int subsId) {
        FrontEndSubscription feSubs = this.subsMap.get(subsId);
        if (feSubs != null) {
            RealLiveTable table = this.dclient.tbl(feSubs.getTableName());
            table.unsubscribe(feSubs.getSubscriber());
            this.subsMap.remove(subsId);
            feSubs.getFrontEndCallback().finish();
        }
    }

    public int getSubsId() {
        return this.subsIdCount.incrementAndGet();
    }

    public void setTableFactory(Function<String, ChangeStream> factory) {
        this.tableFactory = factory;
    }

    public DataClient getDataClient() {
        return this.dclient;
    }

    public boolean isTerminated() {
        return this.terminated;
    }

    public void setTerminated(boolean iAmDead) {
        this.terminated = iAmDead;
    }

    public void unsubscribeAll() {
        this.subsMap.keySet().stream().collect(Collectors.toList()).forEach(k -> this.unsubscribe((int)k));
    }
}

