/*
 * Decompiled with CFR 0.152.
 */
package cn.sliew.carp.framework.pubsub.queue.kekio;

import cn.sliew.carp.framework.common.serder.SerDer;
import cn.sliew.carp.framework.common.serder.jdk.JdkSerDerFactory;
import cn.sliew.carp.framework.pubsub.model.AbstractPubsubChannel;
import cn.sliew.carp.framework.pubsub.model.PubsubChannel;
import cn.sliew.carp.framework.pubsub.model.PubsubSubscriber;
import cn.sliew.carp.framework.pubsub.queue.kekio.QueuePubsubSubscriber;
import cn.sliew.carp.framework.queue.kekio.AbstractQueue;
import cn.sliew.carp.framework.queue.kekio.Queue;
import cn.sliew.carp.framework.queue.kekio.QueueProcessor;
import cn.sliew.carp.framework.queue.kekio.message.CommonMessage;
import cn.sliew.carp.framework.queue.kekio.message.Message;
import java.time.Duration;
import java.time.temporal.TemporalAmount;

public class QueuePubsubChannel
extends AbstractPubsubChannel
implements PubsubChannel {
    private String name;
    private Queue queue;
    private QueueProcessor processor;

    public QueuePubsubChannel(String name, Queue queue) {
        this.name = name;
        this.queue = queue;
    }

    public String getName() {
        return this.name;
    }

    public void register(PubsubSubscriber subscriber) {
        if (subscriber instanceof QueuePubsubSubscriber) {
            QueuePubsubSubscriber queuePubsubSubscriber = (QueuePubsubSubscriber)subscriber;
            this.processor.addMessageHandler(queuePubsubSubscriber.getMessageHandler());
        }
    }

    public void remove(PubsubSubscriber subscriber) {
        if (subscriber instanceof QueuePubsubSubscriber) {
            QueuePubsubSubscriber queuePubsubSubscriber = (QueuePubsubSubscriber)subscriber;
            this.processor.removeMessageHandler(queuePubsubSubscriber.getMessageHandler());
        }
    }

    public void push(Object message, Duration delay) {
        SerDer serDer = JdkSerDerFactory.INSTANCE.getInstance();
        CommonMessage commonMessage = CommonMessage.builder().body(serDer.serialize(message)).build();
        this.queue.push((Message)commonMessage, (TemporalAmount)delay);
    }

    protected void doStart() throws Exception {
        this.queue.start();
        Queue queue = this.queue;
        if (queue instanceof AbstractQueue) {
            AbstractQueue abstractQueue = (AbstractQueue)queue;
            this.processor = abstractQueue.getProcessor();
        }
    }

    protected void doStop() throws Exception {
        this.queue.stop();
    }
}

