package org.apache.edgent.connectors.pubsub;

import org.apache.edgent.connectors.pubsub.oplets.Publish;
import org.apache.edgent.connectors.pubsub.service.PublishSubscribeService;
import org.apache.edgent.execution.services.RuntimeServices;
import org.apache.edgent.function.Consumer;
import org.apache.edgent.function.Supplier;
import org.apache.edgent.topology.TSink;
import org.apache.edgent.topology.TStream;
import org.apache.edgent.topology.TopologyElement;

/* loaded from: input_file:org/apache/edgent/connectors/pubsub/PublishSubscribe.class */
public class PublishSubscribe {
    public static final String RESERVED_TOPIC_PREFIX = "edgent/";

    /* loaded from: input_file:org/apache/edgent/connectors/pubsub/PublishSubscribe$SubscriberSetup.class */
    private static final class SubscriberSetup<T> implements Consumer<Consumer<T>>, AutoCloseable {
        private static final long serialVersionUID = 1;
        private final Supplier<RuntimeServices> rts;
        private final String topic;
        private final Class<T> streamType;
        private Consumer<T> submitter;

        SubscriberSetup(String str, Class<T> cls, Supplier<RuntimeServices> supplier) {
            this.topic = str;
            this.streamType = cls;
            this.rts = supplier;
        }

        public void accept(Consumer<T> consumer) {
            PublishSubscribeService publishSubscribeService = (PublishSubscribeService) ((RuntimeServices) this.rts.get()).getService(PublishSubscribeService.class);
            if (publishSubscribeService != null) {
                this.submitter = consumer;
                publishSubscribeService.addSubscriber(this.topic, this.streamType, consumer);
            }
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            PublishSubscribeService publishSubscribeService = (PublishSubscribeService) ((RuntimeServices) this.rts.get()).getService(PublishSubscribeService.class);
            if (publishSubscribeService != null) {
                publishSubscribeService.removeSubscriber(this.topic, this.submitter);
            }
        }
    }

    public static <T> TSink<T> publish(TStream<T> tStream, String str, Class<? super T> cls) {
        return tStream.sink(new Publish(str, cls));
    }

    public static <T> TStream<T> subscribe(TopologyElement topologyElement, String str, Class<T> cls) {
        return topologyElement.topology().events(new SubscriberSetup(str, cls, topologyElement.topology().getRuntimeServiceSupplier()));
    }
}
