package rapture.kernel;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.Logger;
import rapture.batch.kernel.handler.HandlerConstants;
import rapture.common.CallingContext;
import rapture.common.CategoryQueueBindings;
import rapture.common.CategoryQueueBindingsStorage;
import rapture.common.ExchangeDomain;
import rapture.common.ExchangeDomainStorage;
import rapture.common.PipelineTaskStatus;
import rapture.common.RapturePipelineTask;
import rapture.common.RaptureURI;
import rapture.common.Scheme;
import rapture.common.ServerCategory;
import rapture.common.ServerCategoryStorage;
import rapture.common.TableQuery;
import rapture.common.api.PipelineApi;
import rapture.common.exception.RaptureException;
import rapture.common.exception.RaptureExceptionFactory;
import rapture.common.impl.jackson.JsonContent;
import rapture.common.model.RaptureExchange;
import rapture.common.model.RaptureExchangeQueue;
import rapture.common.model.RaptureExchangeStorage;
import rapture.common.model.RaptureExchangeType;
import rapture.exchange.ExchangeFactory;
import rapture.exchange.ExchangeHandler;
import rapture.exchange.QueueHandler;
import rapture.exchange.TopicMessageHandler;
import rapture.kernel.internalnotification.ExchangeChangeManager;
import rapture.kernel.pipeline.ExchangeConfigFactory;
import rapture.kernel.pipeline.PipelineIndexHelper;
import rapture.kernel.pipeline.PipelineTaskStatusManager;
import rapture.notification.NotificationMessage;
import rapture.notification.RaptureMessageListener;
import rapture.repo.RepoVisitor;
import rapture.series.children.PathConstants;

/* loaded from: input_file:rapture/kernel/PipelineApiImpl.class */
public class PipelineApiImpl extends KernelBase implements PipelineApi, RaptureMessageListener<NotificationMessage> {
    private static final String ERROR = "Could not load document %s, continuing anyway";
    private final PipelineTaskStatusManager taskStatusManager;
    private static Logger log = Logger.getLogger(PipelineApiImpl.class);
    private Map<String, ExchangeHandler> domainHandlers;

    public PipelineApiImpl(Kernel kernel) {
        super(kernel);
        this.domainHandlers = new HashMap();
        log.info("Ensure index exists");
        PipelineIndexHelper.ensureIndexExists();
        this.taskStatusManager = new PipelineTaskStatusManager();
    }

    public Boolean registerServerCategory(CallingContext callingContext, String str, String str2) {
        ServerCategory serverCategory = new ServerCategory();
        serverCategory.setDescription(str2);
        serverCategory.setName(str);
        ServerCategoryStorage.add(serverCategory, callingContext.getUser(), "Created server category");
        return true;
    }

    public List<String> getExchanges(CallingContext callingContext) {
        final ArrayList arrayList = new ArrayList();
        RaptureExchangeStorage.visitAll(new RepoVisitor() { // from class: rapture.kernel.PipelineApiImpl.1
            public boolean visit(String str, JsonContent jsonContent, boolean z) {
                if (z) {
                    return true;
                }
                if (jsonContent == null || jsonContent.getContent() == null) {
                    arrayList.add(str);
                    return true;
                }
                arrayList.add(RaptureExchangeStorage.readFromJson(jsonContent).getName());
                return true;
            }
        });
        return arrayList;
    }

    public void removeServerCategory(CallingContext callingContext, String str) {
        CategoryQueueBindingsStorage.deleteByFields(str, callingContext.getUser(), "Removed category bindings");
        ServerCategoryStorage.deleteByFields(str, callingContext.getUser(), "Removed server category");
    }

    public List<String> getServerCategories(CallingContext callingContext) {
        final ArrayList arrayList = new ArrayList();
        ServerCategoryStorage.visitAll(new RepoVisitor() { // from class: rapture.kernel.PipelineApiImpl.2
            public boolean visit(String str, JsonContent jsonContent, boolean z) {
                if (z) {
                    return true;
                }
                arrayList.add(ServerCategoryStorage.readFromJson(jsonContent).getName());
                return true;
            }
        });
        return arrayList;
    }

    public List<String> getExchangeDomains(CallingContext callingContext) {
        final ArrayList arrayList = new ArrayList();
        ExchangeDomainStorage.visitAll(new RepoVisitor() { // from class: rapture.kernel.PipelineApiImpl.3
            public boolean visit(String str, JsonContent jsonContent, boolean z) {
                if (z) {
                    return true;
                }
                arrayList.add(ExchangeDomainStorage.readFromJson(jsonContent).getName());
                return true;
            }
        });
        return arrayList;
    }

    private CategoryQueueBindings getQueueBindings(String str) {
        return CategoryQueueBindingsStorage.readByFields(str);
    }

    public Boolean bindPipeline(CallingContext callingContext, String str, String str2, String str3) {
        CategoryQueueBindings queueBindings = getQueueBindings(str);
        if (queueBindings == null) {
            queueBindings = new CategoryQueueBindings();
            queueBindings.setBindings(new HashMap());
            queueBindings.setName(str);
        }
        Map bindings = queueBindings.getBindings();
        if (!bindings.containsKey(str2)) {
            bindings.put(str2, new HashSet());
        }
        ((Set) bindings.get(str2)).add(str3);
        CategoryQueueBindingsStorage.add(queueBindings, callingContext.getUser(), "Added binding");
        return true;
    }

    public Boolean removeBoundPipeline(CallingContext callingContext, String str, String str2, String str3) {
        CategoryQueueBindings queueBindings = getQueueBindings(str);
        if (queueBindings == null) {
            return false;
        }
        Map bindings = queueBindings.getBindings();
        if (bindings.containsKey(str2)) {
            ((Set) bindings.get(str2)).remove(str3);
            if (((Set) bindings.get(str2)).isEmpty()) {
                bindings.remove(str2);
            }
        }
        CategoryQueueBindingsStorage.add(queueBindings, callingContext.getUser(), "Removed binding");
        return true;
    }

    public Boolean publishPipelineMessage(CallingContext callingContext, String str, RapturePipelineTask rapturePipelineTask) {
        RaptureExchange exchange = getExchange(str);
        if (exchange == null) {
            log.error("No exchange found for " + str + ", cannot publish Pipeline message");
            return false;
        }
        ExchangeHandler exchangeHandler = getExchangeHandler(exchange);
        exchangeHandler.setupExchange(exchange);
        this.taskStatusManager.initialCreation(rapturePipelineTask);
        log.debug("Publishing pipeline task: " + rapturePipelineTask.getTaskId());
        exchangeHandler.putTaskOnExchange(str, rapturePipelineTask, rapturePipelineTask.getCategoryList().isEmpty() ? "" : (String) rapturePipelineTask.getCategoryList().get(0));
        return true;
    }

    public void drainPipeline(CallingContext callingContext, String str) {
        RaptureExchange exchange = getExchange(str);
        if (exchange != null) {
            ExchangeHandler exchangeHandler = getExchangeHandler(exchange);
            exchangeHandler.setupExchange(exchange);
            exchangeHandler.tearDownExchange(exchange);
            Kernel.exchangeChanged(str);
        }
    }

    private ExchangeHandler getExchangeHandler(RaptureExchange raptureExchange) {
        return getExchangeHandler(raptureExchange.getDomain());
    }

    private ExchangeHandler getExchangeHandler(String str) {
        if (!this.domainHandlers.containsKey(str)) {
            log.info("Domain " + str + " not found, setting up");
            setupDomain(str);
        }
        return this.domainHandlers.get(str);
    }

    private synchronized void setupDomain(String str) {
        ExchangeDomain exchangeDomain = getExchangeDomain(str);
        if (exchangeDomain != null) {
            this.domainHandlers.put(str, ExchangeFactory.getHandler(str, exchangeDomain.getConfig()));
        }
    }

    private RaptureExchange getExchange(String str) {
        return RaptureExchangeStorage.readByFields(str);
    }

    private ExchangeDomain getExchangeDomain(String str) {
        log.info("Getting domain for " + str);
        return ExchangeDomainStorage.readByAddress(new RaptureURI("//" + str, Scheme.EXCHANGE_DOMAIN));
    }

    public List<CategoryQueueBindings> getBoundExchanges(CallingContext callingContext, final String str) {
        final ArrayList arrayList = new ArrayList();
        CategoryQueueBindingsStorage.visitAll(new RepoVisitor() { // from class: rapture.kernel.PipelineApiImpl.4
            public boolean visit(String str2, JsonContent jsonContent, boolean z) {
                if (z) {
                    return true;
                }
                try {
                    CategoryQueueBindings readFromJson = CategoryQueueBindingsStorage.readFromJson(jsonContent);
                    if (readFromJson.getName().equals(str)) {
                        arrayList.add(readFromJson);
                    }
                    return true;
                } catch (RaptureException e) {
                    PipelineApiImpl.this.logError(str2);
                    return true;
                }
            }
        });
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void logError(String str) {
        log.error(String.format(ERROR, str));
    }

    public Boolean registerPipelineExchange(CallingContext callingContext, String str, RaptureExchange raptureExchange) {
        RaptureExchangeStorage.add(raptureExchange, callingContext.getUser(), "Created pipeline exchange");
        return true;
    }

    public void deregisterPipelineExchange(CallingContext callingContext, String str) {
        RaptureExchangeStorage.deleteByFields(str, callingContext.getUser(), "Removed pipeline exchange");
    }

    public RaptureExchange getExchange(CallingContext callingContext, String str) {
        return RaptureExchangeStorage.readByFields(str);
    }

    public void registerExchangeDomain(CallingContext callingContext, String str, String str2) {
        RaptureURI raptureURI = new RaptureURI(str, Scheme.EXCHANGE_DOMAIN);
        ExchangeDomain exchangeDomain = new ExchangeDomain();
        String fullPath = raptureURI.getFullPath();
        if (fullPath.endsWith(PathConstants.PATH_SEPARATOR)) {
            fullPath = fullPath.substring(0, fullPath.length() - 1);
        }
        exchangeDomain.setName(fullPath);
        exchangeDomain.setConfig(str2);
        ExchangeDomainStorage.add(exchangeDomain, callingContext.getUser(), "Created exchange domain");
    }

    public void deregisterExchangeDomain(CallingContext callingContext, String str) {
        ExchangeDomainStorage.deleteByAddress(new RaptureURI(str, Scheme.SCRIPT), callingContext.getUser(), "Removed exchange domain");
    }

    public void lowRegisterStandard(String str, QueueHandler queueHandler) {
        RaptureExchange createStandardFanout = ExchangeConfigFactory.createStandardFanout();
        ExchangeHandler exchangeHandler = getExchangeHandler(createStandardFanout);
        exchangeHandler.setupExchange(createStandardFanout);
        Iterator it = createStandardFanout.getQueueBindings().iterator();
        while (it.hasNext()) {
            exchangeHandler.startConsuming(createStandardFanout.getName(), ((RaptureExchangeQueue) it.next()).getName(), queueHandler);
        }
        RaptureExchange createStandardDirect = ExchangeConfigFactory.createStandardDirect(str);
        ExchangeHandler exchangeHandler2 = getExchangeHandler(createStandardDirect);
        exchangeHandler2.setupExchange(createStandardDirect);
        Iterator it2 = createStandardDirect.getQueueBindings().iterator();
        while (it2.hasNext()) {
            exchangeHandler2.startConsuming(createStandardDirect.getName(), ((RaptureExchangeQueue) it2.next()).getName(), queueHandler);
        }
    }

    public void lowRegisterListener(String str, String str2, QueueHandler queueHandler) {
        log.info("Looking for exchange " + str);
        RaptureExchange exchange = getExchange(str);
        if (exchange == null) {
            log.error("No exchange found for " + str + ", cannot start consuming Pipeline messages");
            return;
        }
        log.info("Retrieving handler for exchange");
        ExchangeHandler exchangeHandler = getExchangeHandler(exchange);
        exchangeHandler.setupExchange(exchange);
        exchangeHandler.startConsuming(str, str2, queueHandler);
    }

    public void lowDeregisterListener(String str, String str2) {
    }

    public void signalMessage(NotificationMessage notificationMessage) {
        if (ExchangeChangeManager.isExchangeMessage(notificationMessage)) {
            handleExchangeChanged(ExchangeChangeManager.getExchangeFromMessage(notificationMessage));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleExchangeChanged(String str) {
        RaptureExchange exchange;
        if (str == null || (exchange = getExchange(str)) == null) {
            return;
        }
        getExchangeHandler(exchange).ensureExchangeUnAvailable(exchange);
    }

    public PipelineTaskStatus getStatus(CallingContext callingContext, String str) {
        return this.taskStatusManager.getStatus(str);
    }

    public List<RapturePipelineTask> queryTasks(CallingContext callingContext, String str) {
        return this.taskStatusManager.queryTasks(str);
    }

    public List<RapturePipelineTask> queryTasksOld(CallingContext callingContext, TableQuery tableQuery) {
        return this.taskStatusManager.queryTasksOld(tableQuery);
    }

    public Long getLatestTaskEpoch(CallingContext callingContext) {
        return this.taskStatusManager.getLatestEpoch();
    }

    public void setupStandardCategory(CallingContext callingContext, String str) {
        if (!registerServerCategory(callingContext, str, HandlerConstants.DESCRIPTION).booleanValue()) {
            throw RaptureExceptionFactory.create("Error registering server category " + str);
        }
        bindPipeline(callingContext, str, "$defaultExchange", "");
    }

    public void publishMessageToCategory(CallingContext callingContext, RapturePipelineTask rapturePipelineTask) {
        if (rapturePipelineTask.getCategoryList() == null || rapturePipelineTask.getCategoryList().isEmpty()) {
            log.error("Attempt to publish message without a target category");
            throw RaptureExceptionFactory.create(400, "Bad message, a category must be specified for load balanced messages!");
        }
        this.taskStatusManager.initialCreation(rapturePipelineTask);
        log.debug("Publishing pipeline task: " + rapturePipelineTask.getTaskId());
        RaptureExchange createStandardDirect = ExchangeConfigFactory.createStandardDirect((String) rapturePipelineTask.getCategoryList().get(0));
        getExchangeHandler(createStandardDirect).putTaskOnExchange(createStandardDirect.getName(), rapturePipelineTask, ExchangeConfigFactory.createLoadBalancingRoutingKey((String) rapturePipelineTask.getCategoryList().get(0)));
        log.debug("Publishing complete");
    }

    public void broadcastMessageToCategory(CallingContext callingContext, RapturePipelineTask rapturePipelineTask) {
        if (rapturePipelineTask.getCategoryList() == null || rapturePipelineTask.getCategoryList().isEmpty()) {
            throw RaptureExceptionFactory.create(400, "Bad message, a category must be specified for broadcast to category!");
        }
        this.taskStatusManager.initialCreation(rapturePipelineTask);
        log.debug("Publishing pipeline task: " + rapturePipelineTask.getTaskId());
        RaptureExchange createStandardDirect = ExchangeConfigFactory.createStandardDirect((String) rapturePipelineTask.getCategoryList().get(0));
        getExchangeHandler(createStandardDirect).putTaskOnExchange(createStandardDirect.getName(), rapturePipelineTask, ExchangeConfigFactory.createBroadcastRoutingKey((String) rapturePipelineTask.getCategoryList().get(0)));
    }

    public void broadcastMessageToAll(CallingContext callingContext, RapturePipelineTask rapturePipelineTask) {
        if (rapturePipelineTask.getCategoryList() != null && !rapturePipelineTask.getCategoryList().isEmpty()) {
            throw RaptureExceptionFactory.create(400, "Bad message, category cannot be specified for broadcast to all messages!");
        }
        this.taskStatusManager.initialCreation(rapturePipelineTask);
        log.debug("Publishing pipeline task: " + rapturePipelineTask.getTaskId());
        RaptureExchange createStandardFanout = ExchangeConfigFactory.createStandardFanout();
        getExchangeHandler(createStandardFanout).putTaskOnExchange(createStandardFanout.getName(), rapturePipelineTask, "");
    }

    public Map<String, Object> makeRPC(CallingContext callingContext, String str, String str2, Map<String, Object> map, Long l) {
        return getExchangeHandler("main").makeRPC(str, str2, map, l.longValue());
    }

    public void createTopicExchange(CallingContext callingContext, String str, String str2) {
        RaptureExchange raptureExchange = new RaptureExchange();
        raptureExchange.setDomain(str);
        raptureExchange.setExchangeType(RaptureExchangeType.TOPIC);
        raptureExchange.setName(str2);
        registerPipelineExchange(callingContext, str2, raptureExchange);
    }

    public void publishTopicMessage(CallingContext callingContext, String str, String str2, String str3, String str4) {
        RaptureExchange exchange = getExchange(str2);
        if (exchange != null) {
            ExchangeHandler exchangeHandler = getExchangeHandler(exchange);
            exchangeHandler.setupExchange(exchange);
            exchangeHandler.publishTopicMessage(str2, str3, str4);
        }
    }

    public long subscribeTopic(String str, String str2, String str3, TopicMessageHandler topicMessageHandler) {
        log.info("Subscribing attempt to d=" + str + ",e=" + str2 + ",t=" + str3);
        RaptureExchange exchange = getExchange(str2);
        if (exchange == null) {
            log.error("No config found for " + str2);
            return -1L;
        }
        ExchangeHandler exchangeHandler = getExchangeHandler(exchange);
        exchangeHandler.setupExchange(exchange);
        return exchangeHandler.subscribeTopic(str2, str3, topicMessageHandler);
    }

    public void unsubscribeTopic(String str, String str2, long j) {
        RaptureExchange exchange = getExchange(str2);
        if (exchange != null) {
            ExchangeHandler exchangeHandler = getExchangeHandler(exchange);
            exchangeHandler.setupExchange(exchange);
            exchangeHandler.unsubscribeTopic(j);
        }
    }
}
