Class: VertxKafkaClient::KafkaConsumer
- Inherits:
-
Object
- Object
- VertxKafkaClient::KafkaConsumer
- Includes:
- Vertx::ReadStream
- Defined in:
- /Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb
Overview
Vert.x Kafka consumer.
You receive Kafka records by providing a #handler. As messages arrive the handler will be called with the records.
The #pause and #resume provides global control over reading the records from the consumer.
The #pause and #resume provides finer grained control over reading records for specific Topic/Partition, these are Kafka's specific operations.
Class Method Summary (collapse)
-
+ (::VertxKafkaClient::KafkaConsumer) create(vertx = nil, config = nil, keyType = nil, valueType = nil)
Create a new KafkaConsumer instance.
Instance Method Summary (collapse)
-
- (self) assign(param_1 = nil)
Manually assign a list of partition to this consumer.
-
- (self) assignment { ... }
Get the set of partitions currently assigned to this consumer.
-
- (void) close { ... }
Close the consumer.
-
- (void) commit { ... }
Commit current offsets for all the subscribed list of topics and partition.
-
- (void) committed(topicPartition = nil) { ... }
Get the last committed offset for the given partition (whether the commit happened by this process or another).
- - (self) end_handler { ... }
- - (self) exception_handler { ... }
- - (self) handler { ... }
-
- (self) partitions_assigned_handler { ... }
Set the handler called when topic partitions are assigned to the consumer.
-
- (self) partitions_for(topic = nil) { ... }
Get metadata about the partitions for a given topic.
-
- (self) partitions_revoked_handler { ... }
Set the handler called when topic partitions are revoked to the consumer.
-
- (self) pause(param_1 = nil)
Suspend fetching from the requested partitions.
-
- (void) paused { ... }
Get the set of partitions that were previously paused by a call to pause(Set).
-
- (void) position(partition = nil) { ... }
Get the offset of the next record that will be fetched (if a record with that offset exists).
-
- (self) resume(param_1 = nil)
Resume specified partitions which have been paused with pause.
-
- (self) seek(topicPartition = nil, offset = nil) { ... }
Overrides the fetch offsets that the consumer will use on the next poll.
-
- (self) seek_to_beginning(param_1 = nil)
Seek to the first offset for each of the given partitions.
-
- (self) seek_to_end(param_1 = nil)
Seek to the last offset for each of the given partitions.
-
- (self) subscribe(param_1 = nil)
Subscribe to the given list of topics to get dynamically assigned partitions.
-
- (self) subscription { ... }
Get the current subscription.
-
- (self) unsubscribe { ... }
Unsubscribe from topics currently subscribed with subscribe.
Class Method Details
+ (::VertxKafkaClient::KafkaConsumer) create(vertx = nil, config = nil, keyType = nil, valueType = nil)
Create a new KafkaConsumer instance
36 37 38 39 40 41 42 43 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 36 def self.create(vertx=nil,config=nil,keyType=nil,valueType=nil) if vertx.class.method_defined?(:j_del) && config.class == Hash && !block_given? && keyType == nil && valueType == nil return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }]),::VertxKafkaClient::KafkaConsumer, nil, nil) elsif vertx.class.method_defined?(:j_del) && config.class == Hash && keyType.class == Class && valueType.class == Class && !block_given? return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class,Java::JavaLang::Class.java_class,Java::JavaLang::Class.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }],::Vertx::Util::Utils.j_class_of(keyType),::Vertx::Util::Utils.j_class_of(valueType)),::VertxKafkaClient::KafkaConsumer, ::Vertx::Util::Utils.v_type_of(keyType), ::Vertx::Util::Utils.v_type_of(valueType)) end raise ArgumentError, "Invalid arguments when calling create(#{vertx},#{config},#{keyType},#{valueType})" end |
Instance Method Details
- (self) assign(topicPartition) - (self) assign(topicPartitions) - (self) assign(topicPartition, completionHandler) { ... } - (self) assign(topicPartitions, completionHandler) { ... }
Manually assign a list of partition to this consumer.
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 175 def assign(param_1=nil) if param_1.class == Hash && !block_given? @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling assign(#{param_1})" end |
- (self) assignment { ... }
Get the set of partitions currently assigned to this consumer.
194 195 196 197 198 199 200 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 194 def assignment if block_given? @j_del.java_method(:assignment, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) })) return self end raise ArgumentError, "Invalid arguments when calling assignment()" end |
- (void) close { ... }
This method returns an undefined value.
Close the consumer
359 360 361 362 363 364 365 366 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 359 def close if !block_given? return @j_del.java_method(:close, []).call() elsif block_given? return @j_del.java_method(:close, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) end raise ArgumentError, "Invalid arguments when calling close()" end |
- (void) commit { ... }
This method returns an undefined value.
Commit current offsets for all the subscribed list of topics and partition.
327 328 329 330 331 332 333 334 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 327 def commit if !block_given? return @j_del.java_method(:commit, []).call() elsif block_given? return @j_del.java_method(:commit, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) end raise ArgumentError, "Invalid arguments when calling commit()" end |
- (void) committed(topicPartition = nil) { ... }
This method returns an undefined value.
Get the last committed offset for the given partition (whether the commit happened by this process or another).
339 340 341 342 343 344 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 339 def committed(topicPartition=nil) if topicPartition.class == Hash && block_given? return @j_del.java_method(:committed, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) })) end raise ArgumentError, "Invalid arguments when calling committed(#{topicPartition})" end |
- (self) end_handler { ... }
128 129 130 131 132 133 134 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 128 def end_handler if block_given? @j_del.java_method(:endHandler, [Java::IoVertxCore::Handler.java_class]).call(Proc.new { yield }) return self end raise ArgumentError, "Invalid arguments when calling end_handler()" end |
- (self) exception_handler { ... }
46 47 48 49 50 51 52 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 46 def exception_handler if block_given? @j_del.java_method(:exceptionHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.from_throwable(event)) })) return self end raise ArgumentError, "Invalid arguments when calling exception_handler()" end |
- (self) handler { ... }
55 56 57 58 59 60 61 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 55 def handler if block_given? @j_del.java_method(:handler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecord, nil, nil)) })) return self end raise ArgumentError, "Invalid arguments when calling handler()" end |
- (self) partitions_assigned_handler { ... }
Set the handler called when topic partitions are assigned to the consumer
246 247 248 249 250 251 252 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 246 def partitions_assigned_handler if block_given? @j_del.java_method(:partitionsAssignedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) })) return self end raise ArgumentError, "Invalid arguments when calling partitions_assigned_handler()" end |
- (self) partitions_for(topic = nil) { ... }
Get metadata about the partitions for a given topic.
349 350 351 352 353 354 355 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 349 def partitions_for(topic=nil) if topic.class == String && block_given? @j_del.java_method(:partitionsFor, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(topic,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result.to_a.map { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) })) return self end raise ArgumentError, "Invalid arguments when calling partitions_for(#{topic})" end |
- (self) partitions_revoked_handler { ... }
Set the handler called when topic partitions are revoked to the consumer
236 237 238 239 240 241 242 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 236 def partitions_revoked_handler if block_given? @j_del.java_method(:partitionsRevokedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) })) return self end raise ArgumentError, "Invalid arguments when calling partitions_revoked_handler()" end |
- (self) pause - (self) pause(topicPartition) - (self) pause(topicPartitions) - (self) pause(topicPartition, completionHandler) { ... } - (self) pause(topicPartitions, completionHandler) { ... }
Suspend fetching from the requested partitions.
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 75 def pause(param_1=nil) if !block_given? && param_1 == nil @j_del.java_method(:pause, []).call() return self elsif param_1.class == Hash && !block_given? @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling pause(#{param_1})" end |
- (void) paused { ... }
This method returns an undefined value.
Get the set of partitions that were previously paused by a call to pause(Set).
227 228 229 230 231 232 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 227 def paused if block_given? return @j_del.java_method(:paused, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) })) end raise ArgumentError, "Invalid arguments when calling paused()" end |
- (void) position(partition = nil) { ... }
This method returns an undefined value.
Get the offset of the next record that will be fetched (if a record with that offset exists).
371 372 373 374 375 376 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 371 def position(partition=nil) if partition.class == Hash && block_given? return @j_del.java_method(:position, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(partition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) })) end raise ArgumentError, "Invalid arguments when calling position(#{partition})" end |
- (self) resume - (self) resume(topicPartition) - (self) resume(topicPartitions) - (self) resume(topicPartition, completionHandler) { ... } - (self) resume(topicPartitions, completionHandler) { ... }
Resume specified partitions which have been paused with pause.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 107 def resume(param_1=nil) if !block_given? && param_1 == nil @j_del.java_method(:resume, []).call() return self elsif param_1.class == Hash && !block_given? @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling resume(#{param_1})" end |
- (self) seek(topicPartition = nil, offset = nil) { ... }
Overrides the fetch offsets that the consumer will use on the next poll.
258 259 260 261 262 263 264 265 266 267 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 258 def seek(topicPartition=nil,offset=nil) if topicPartition.class == Hash && offset.class == Fixnum && !block_given? @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset) return self elsif topicPartition.class == Hash && offset.class == Fixnum && block_given? @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling seek(#{topicPartition},#{offset})" end |
- (self) seekToBeginning(topicPartition) - (self) seekToBeginning(topicPartitions) - (self) seekToBeginning(topicPartition, completionHandler) { ... } - (self) seekToBeginning(topicPartitions, completionHandler) { ... }
Seek to the first offset for each of the given partitions.
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 280 def seek_to_beginning(param_1=nil) if param_1.class == Hash && !block_given? @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling seek_to_beginning(#{param_1})" end |
- (self) seekToEnd(topicPartition) - (self) seekToEnd(topicPartitions) - (self) seekToEnd(topicPartition, completionHandler) { ... } - (self) seekToEnd(topicPartitions, completionHandler) { ... }
Seek to the last offset for each of the given partitions.
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 308 def seek_to_end(param_1=nil) if param_1.class == Hash && !block_given? @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling seek_to_end(#{param_1})" end |
- (self) subscribe(topic) - (self) subscribe(topics) - (self) subscribe(topic, completionHandler) { ... } - (self) subscribe(topics, completionHandler) { ... }
Subscribe to the given list of topics to get dynamically assigned partitions.
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 147 def subscribe(param_1=nil) if param_1.class == String && !block_given? @j_del.java_method(:subscribe, [Java::java.lang.String.java_class]).call(param_1) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element })) return self elsif param_1.class == String && block_given? @j_del.java_method(:subscribe, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(param_1,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling subscribe(#{param_1})" end |
- (self) subscription { ... }
Get the current subscription.
217 218 219 220 221 222 223 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 217 def subscription if block_given? @j_del.java_method(:subscription, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt } : nil) })) return self end raise ArgumentError, "Invalid arguments when calling subscription()" end |
- (self) unsubscribe { ... }
Unsubscribe from topics currently subscribed with subscribe.
204 205 206 207 208 209 210 211 212 213 |
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 204 def unsubscribe if !block_given? @j_del.java_method(:unsubscribe, []).call() return self elsif block_given? @j_del.java_method(:unsubscribe, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling unsubscribe()" end |