package net.segoia.distributed.framework;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Vector;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import net.segoia.commons.exceptions.ContextAwareException;
import net.segoia.eventbus.Event;
import net.segoia.eventbus.EventBus;
import net.segoia.eventbus.EventListener;
import net.segoia.eventbus.SimpleEventBus;
import net.segoia.log.Log;
import net.segoia.util.logging.Logger;
import net.segoia.util.logging.MasterLogManager;
import net.segoia.util.processing.Destination;
import net.segoia.util.processing.GenericSource;
import net.segoia.util.processing.RunnableProcessor;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.Receiver;
import org.jgroups.View;
import org.jgroups.util.Util;

/* loaded from: input_file:net/segoia/distributed/framework/ProcessingNode.class */
public class ProcessingNode implements Receiver, Destination<Task, TaskProcessingResponse> {
    private static final Logger logger = MasterLogManager.getLogger(ProcessingNode.class.getName());
    public static String DISTRIBUTED_PROCESSOR_PROPERTIES_FILE = "node.properties";
    private static final String GROUP_NAME = "group.name";
    private static final String SERVICE_PREFIX = "service.";
    private static final String CLASS_SUFIX = ".class";
    private static final String COUNT_SUFIX = ".count";
    private Properties props;
    private String groupName;
    private ClassLoader resourcesLoader;
    private Map<DistributedServiceDescription, DistributedService> localServices;
    private Map<DistributedServiceDescription, List<Address>> globalServices;
    private Map<Long, ProcessingResponseReceiver> pendingRequests;
    private Map<Long, BroadcastTaskProcessingResponse> pendigBroadcastRequests;
    private Map<Task, Address> pendingResponses;
    private Map<DistributedServiceDescription, GenericSource<Task>> servicesQueues;
    private JChannel channel;
    private List<Address> members;
    private Long nextTaskId;
    private int maxConcurentTasksToProcess;
    private Map<DistributedServiceDescription, Integer> addressesIndexes;
    private Executor executor;
    private boolean started;
    private EventBus internalEventBus;

    public ProcessingNode() {
        this.localServices = new Hashtable();
        this.globalServices = new Hashtable();
        this.pendingRequests = new Hashtable();
        this.pendigBroadcastRequests = new Hashtable();
        this.pendingResponses = new Hashtable();
        this.servicesQueues = new HashMap();
        this.members = new ArrayList();
        this.nextTaskId = 1L;
        this.maxConcurentTasksToProcess = 100;
        this.addressesIndexes = new HashMap();
        this.started = false;
        this.internalEventBus = new SimpleEventBus();
        this.localServices = new HashMap();
        loadProperties();
        this.executor = Executors.newCachedThreadPool();
    }

    public ProcessingNode(String str) {
        this();
        if (str != null) {
            this.groupName = str;
        }
    }

    private void loadProperties() {
        this.props = new Properties();
        try {
            this.props.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(DISTRIBUTED_PROCESSOR_PROPERTIES_FILE));
        } catch (Exception e) {
            Log.warn(this, "Could not load props file " + DISTRIBUTED_PROCESSOR_PROPERTIES_FILE);
        }
        for (Map.Entry entry : this.props.entrySet()) {
            String str = (String) entry.getKey();
            if (str.startsWith(SERVICE_PREFIX) && str.substring(SERVICE_PREFIX.length()).indexOf(".") == -1) {
                String str2 = str + CLASS_SUFIX;
                String str3 = str + COUNT_SUFIX;
                this.props.getProperty(str2);
                this.props.getProperty(str3);
            }
        }
        this.groupName = this.props.getProperty(GROUP_NAME);
    }

    public void addService(DistributedService distributedService) {
        this.localServices.put(distributedService.getServiceDescription(), distributedService);
        logger.info("SERVICE ADDED: " + distributedService.getServiceDescription());
        advertise();
    }

    public void connect() throws Exception {
        if (this.groupName == null) {
            throw new IllegalStateException("The groupName cannot be null");
        }
        if (this.channel == null) {
            this.channel = new JChannel();
        } else if (this.channel.isConnected()) {
            throw new RuntimeException("Channel is already connected");
        }
        logger.debug("Starting node with groupName '" + this.groupName + "'");
        this.channel.setReceiver(this);
        this.channel.connect(this.groupName);
        this.channel.getState((Address) null, 0L);
        logger.info("Node successfuly connected on group " + this.groupName);
    }

    public boolean isConnected() {
        if (this.channel == null) {
            return false;
        }
        return this.channel.isConnected();
    }

    public synchronized void start() throws Exception {
        if (this.started) {
            return;
        }
        if (!isConnected()) {
            connect();
        }
        startServices();
        advertise();
        this.started = true;
    }

    private void startServices() throws ContextAwareException {
        logger.debug("Starting services");
        Iterator<DistributedService> it = this.localServices.values().iterator();
        while (it.hasNext()) {
            it.next().start();
        }
    }

    public void advertise() {
        if (this.channel == null || !this.channel.isConnected()) {
            return;
        }
        Message message = new Message();
        message.setObject(this.localServices.keySet().toArray(new DistributedServiceDescription[0]));
        try {
            sendMessage(message);
        } catch (Exception e) {
            Log.error(this, "Error sending advertise message", e);
        }
    }

    protected void sendMessage(Message message) throws Exception {
        if (this.channel == null || !this.channel.isConnected()) {
            throw new Exception("The channel is null or not connected");
        }
        this.channel.send(message);
    }

    public SubmitTaskResponse submitTask(Task task, ProcessingResponseReceiver processingResponseReceiver) {
        synchronized (this) {
            Long l = this.nextTaskId;
            this.nextTaskId = Long.valueOf(this.nextTaskId.longValue() + 1);
            task.setTaskId(l);
        }
        return task.isBroadcastTask() ? submitBroadcastTask(task, processingResponseReceiver) : submitSimpleTask(task, processingResponseReceiver);
    }

    private SubmitTaskResponse submitSimpleTask(Task task, ProcessingResponseReceiver processingResponseReceiver) {
        DistributedServiceDescription targetService = task.getTargetService();
        SubmitTaskResponse submitTaskResponse = new SubmitTaskResponse();
        Address processingAddressForService = getProcessingAddressForService(targetService);
        if (processingAddressForService != null) {
            Message message = new Message();
            message.setDest(processingAddressForService);
            long longValue = task.getTaskId().longValue();
            message.setObject(task);
            try {
                this.pendingRequests.put(Long.valueOf(longValue), processingResponseReceiver);
                sendMessage(message);
                submitTaskResponse.setSuccessfull(true);
            } catch (Exception e) {
                submitTaskResponse.setError(e);
                submitTaskResponse.setSuccessfull(false);
            }
        } else {
            submitTaskResponse.setSuccessfull(false);
            submitTaskResponse.setError(new Exception("No address registered for service " + targetService));
        }
        return submitTaskResponse;
    }

    private SubmitTaskResponse submitBroadcastTask(Task task, ProcessingResponseReceiver processingResponseReceiver) {
        DistributedServiceDescription targetService = task.getTargetService();
        SubmitTaskResponse submitTaskResponse = new SubmitTaskResponse();
        submitTaskResponse.setSuccessfull(true);
        List<Address> list = this.globalServices.get(targetService);
        if (list != null) {
            long longValue = task.getTaskId().longValue();
            BroadcastTaskProcessingResponse broadcastTaskProcessingResponse = new BroadcastTaskProcessingResponse(Long.valueOf(longValue), list.size());
            this.pendingRequests.put(Long.valueOf(longValue), processingResponseReceiver);
            this.pendigBroadcastRequests.put(Long.valueOf(longValue), broadcastTaskProcessingResponse);
            for (Address address : list) {
                Message message = new Message();
                message.setDest(address);
                message.setObject(task);
                try {
                    sendMessage(message);
                } catch (Exception e) {
                    submitTaskResponse.setError(e);
                    submitTaskResponse.setSuccessfull(false);
                }
            }
        } else {
            submitTaskResponse.setSuccessfull(false);
            submitTaskResponse.setError(new Exception("No address registered for service " + targetService));
        }
        return submitTaskResponse;
    }

    public boolean postEvent(Event event) {
        Message message = new Message();
        message.setObject(event);
        try {
            sendMessage(message);
            return true;
        } catch (Exception e) {
            logger.error("Failed to send event " + event, e);
            return false;
        }
    }

    private Address getProcessingAddressForService(DistributedServiceDescription distributedServiceDescription) {
        int size;
        List<Address> list = this.globalServices.get(distributedServiceDescription);
        if (list == null || (size = list.size()) <= 0) {
            return null;
        }
        int i = 0;
        Integer num = this.addressesIndexes.get(distributedServiceDescription);
        if (num != null) {
            i = num.intValue();
        }
        this.addressesIndexes.put(distributedServiceDescription, Integer.valueOf((i + 1) % size));
        return list.get(i);
    }

    private void checkMandatoryServices() {
    }

    private void updateMemberAndServices(List<Address> list, List<Address> list2) {
        for (Address address : list) {
            if (!list2.contains(address)) {
                for (List<Address> list3 : this.globalServices.values()) {
                    int i = 0;
                    while (true) {
                        if (i >= list3.size()) {
                            break;
                        }
                        if (list3.get(i).toString().equals(address.toString())) {
                            list3.remove(i);
                            break;
                        }
                        i++;
                    }
                }
            }
        }
        this.members = list2;
        Log.info(this, "Members updated: " + this.members);
    }

    private void addServiceToGlobalServices(DistributedServiceDescription distributedServiceDescription, Address address) {
        List<Address> list = this.globalServices.get(distributedServiceDescription);
        if (list == null) {
            list = new Vector();
            this.globalServices.put(distributedServiceDescription, list);
        }
        if (list.contains(address)) {
            return;
        }
        list.add(address);
    }

    private void processMessage(Message message) throws Exception {
        Object object = message.getObject();
        if (object instanceof Task) {
            processTask((Task) object, message.getSrc());
            return;
        }
        if (object instanceof TaskProcessingResponse) {
            TaskProcessingResponse taskProcessingResponse = (TaskProcessingResponse) object;
            BroadcastTaskProcessingResponse broadcastTaskProcessingResponse = this.pendigBroadcastRequests.get(Long.valueOf(taskProcessingResponse.getTaskId().longValue()));
            if (broadcastTaskProcessingResponse != null) {
                broadcastTaskProcessingResponse.addNewResponse(taskProcessingResponse);
                if (!broadcastTaskProcessingResponse.allResponsesReceived()) {
                    return;
                } else {
                    taskProcessingResponse = broadcastTaskProcessingResponse;
                }
            }
            ProcessingResponseReceiver remove = this.pendingRequests.remove(taskProcessingResponse.getTaskId());
            if (remove == null) {
                throw new Exception("No receiver found for taskId " + taskProcessingResponse.getTaskId());
            }
            remove.receiveProcessingResponse(taskProcessingResponse);
            return;
        }
        if (object instanceof Event) {
            this.internalEventBus.postEvent((Event) object);
            return;
        }
        if (!(object instanceof DistributedServiceDescription[])) {
            Log.error(this, "Received unknown message " + object);
            return;
        }
        DistributedServiceDescription[] distributedServiceDescriptionArr = (DistributedServiceDescription[]) object;
        Log.info(this, "Receivd advertise message: " + Arrays.asList(distributedServiceDescriptionArr));
        for (DistributedServiceDescription distributedServiceDescription : distributedServiceDescriptionArr) {
            addServiceToGlobalServices(distributedServiceDescription, message.getSrc());
        }
        Log.info(this, "New state after advertise: " + this.globalServices);
    }

    private void processTask(Task task, Address address) {
        DistributedServiceDescription targetService = task.getTargetService();
        DistributedService distributedService = this.localServices.get(targetService);
        if (distributedService != null) {
            GenericSource<Task> addTaskToProcessingQueue = addTaskToProcessingQueue(targetService, task);
            this.pendingResponses.put(task, address);
            this.executor.execute(new RunnableProcessor(new LocalTaskProcessor(distributedService), addTaskToProcessingQueue, this));
        }
    }

    public int getNumberOfInstancesForService(DistributedServiceDescription distributedServiceDescription) {
        List<Address> list = this.globalServices.get(distributedServiceDescription);
        if (list == null) {
            return 0;
        }
        return list.size();
    }

    public void getState(OutputStream outputStream) {
        try {
            synchronized (this.globalServices) {
                Util.objectToStream(this.globalServices, new DataOutputStream(outputStream));
            }
        } catch (Exception e) {
            Log.error(this, "Error when getState called", e);
        }
    }

    public void receive(Message message) {
        try {
            processMessage(message);
        } catch (Exception e) {
            e.printStackTrace();
            Log.error(getClass(), "Error receiving message " + message, e);
        }
    }

    public void block() {
    }

    public void suspect(Address address) {
    }

    public void viewAccepted(View view) {
        this.channel.getAddress();
        updateMemberAndServices(this.members, view.getMembers());
        checkMandatoryServices();
    }

    public String getGroupName() {
        return this.groupName;
    }

    public Map<DistributedServiceDescription, DistributedService> getLocalServices() {
        return this.localServices;
    }

    public DistributedService getLocalServiceByDesc(DistributedServiceDescription distributedServiceDescription) {
        return this.localServices.get(distributedServiceDescription);
    }

    public void setLocalServices(Map<DistributedServiceDescription, DistributedService> map) {
        this.localServices = map;
    }

    public void setGroupName(String str) {
        this.groupName = str;
    }

    public void addError(Task task, Exception exc) {
        Log.error(this, "Error processing task " + task.getTaskId(), exc);
        sendResponse(task, new TaskProcessingResponse(task.getTaskId(), (Serializable) null, exc));
    }

    public void addOutput(Task task, TaskProcessingResponse taskProcessingResponse) {
        sendResponse(task, taskProcessingResponse);
    }

    private void sendResponse(Task task, TaskProcessingResponse taskProcessingResponse) {
        Message message = new Message();
        message.setDest(this.pendingResponses.remove(task));
        message.setObject(new TaskProcessingResponse(taskProcessingResponse.getTaskId(), taskProcessingResponse.getResult(), taskProcessingResponse.getException(), getLocalNodeAddress()));
        try {
            sendMessage(message);
        } catch (Exception e) {
            Log.error(this, "Error sending response for task " + task.getTaskId(), e);
        }
    }

    private GenericSource<Task> addTaskToProcessingQueue(DistributedServiceDescription distributedServiceDescription, Task task) {
        GenericSource<Task> genericSource = this.servicesQueues.get(distributedServiceDescription);
        if (genericSource == null) {
            genericSource = new GenericSource<>();
            this.servicesQueues.put(distributedServiceDescription, genericSource);
        }
        genericSource.addInput(task);
        return genericSource;
    }

    public ClassLoader getResourcesLoader() {
        return this.resourcesLoader;
    }

    public void setResourcesLoader(ClassLoader classLoader) {
        this.resourcesLoader = classLoader;
    }

    public Address getLocalNodeAddress() {
        return this.channel.getAddress();
    }

    public void registerEventListener(EventListener eventListener) {
        this.internalEventBus.registerListener(eventListener);
    }

    public void removeEventListener(EventListener eventListener) {
        this.internalEventBus.removeListener(eventListener);
    }

    public void setState(InputStream inputStream) throws Exception {
        try {
            this.globalServices = (Map) Util.objectFromStream(new DataInputStream(inputStream));
            Log.info(this, "State set to: " + this.globalServices);
        } catch (Exception e) {
            Log.error(this, "Error setting state", e);
        }
    }

    public void unblock() {
    }
}
