package cloud.orbit.actors.runtime;

import cloud.orbit.actors.cluster.ClusterPeer;
import cloud.orbit.actors.cluster.NodeAddress;
import cloud.orbit.actors.net.HandlerAdapter;
import cloud.orbit.actors.net.HandlerContext;
import cloud.orbit.concurrent.Task;
import cloud.orbit.tuples.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cloud/orbit/actors/runtime/ClusterHandler.class */
public class ClusterHandler extends HandlerAdapter {
    private static Logger logger = LoggerFactory.getLogger(ClusterHandler.class);
    private ClusterPeer clusterPeer;
    private String clusterName;
    private String nodeName;

    public ClusterHandler(ClusterPeer clusterPeer, String str, String str2) {
        this.clusterPeer = clusterPeer;
        this.clusterName = str;
        this.nodeName = str2;
    }

    public Task write(HandlerContext handlerContext, Object obj) throws Exception {
        if (!(obj instanceof Pair)) {
            return handlerContext.write(obj);
        }
        Pair pair = (Pair) obj;
        this.clusterPeer.sendMessage((NodeAddress) pair.getLeft(), (byte[]) pair.getRight());
        return Task.done();
    }

    public Task connect(HandlerContext handlerContext, Object obj) throws Exception {
        logger.info("Connecting handler ClusterHandler...");
        this.clusterPeer.registerMessageReceiver((nodeAddress, bArr) -> {
            handlerContext.fireRead(Pair.of(nodeAddress, bArr));
        });
        return this.clusterPeer.join(this.clusterName, this.nodeName).thenRun(() -> {
            try {
                handlerContext.fireActive();
            } catch (Throwable th) {
                logger.error("Error handling message", th);
            }
        });
    }

    public Task close(HandlerContext handlerContext) throws Exception {
        logger.info("Closing ClusterHandler... ");
        this.clusterPeer.leave();
        return super.close(handlerContext);
    }
}
