Class: VertxKafkaClient::KafkaConsumer

Inherits:
Object
  • Object
show all
Includes:
Vertx::ReadStream
Defined in:
/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb

Overview

Vert.x Kafka consumer.

You receive Kafka records by providing a #handler. As messages arrive the handler will be called with the records.

The #pause and #resume provides global control over reading the records from the consumer.

The #pause and #resume provides finer grained control over reading records for specific Topic/Partition, these are Kafka's specific operations.

Class Method Summary (collapse)

Instance Method Summary (collapse)

Class Method Details

+ (::VertxKafkaClient::KafkaConsumer) create(vertx = nil, config = nil, keyType = nil, valueType = nil)

Create a new KafkaConsumer instance

Parameters:

  • vertx (::Vertx::Vertx) (defaults to: nil)
    Vert.x instance to use
  • config (Hash{String => String}) (defaults to: nil)
    Kafka consumer configuration
  • keyType (Nil) (defaults to: nil)
    class type for the key deserialization
  • valueType (Nil) (defaults to: nil)
    class type for the value deserialization

Returns:

Raises:

  • (ArgumentError)


36
37
38
39
40
41
42
43
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 36

def self.create(vertx=nil,config=nil,keyType=nil,valueType=nil)
  if vertx.class.method_defined?(:j_del) && config.class == Hash && !block_given? && keyType == nil && valueType == nil
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }]),::VertxKafkaClient::KafkaConsumer, nil, nil)
  elsif vertx.class.method_defined?(:j_del) && config.class == Hash && keyType.class == Class && valueType.class == Class && !block_given?
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class,Java::JavaLang::Class.java_class,Java::JavaLang::Class.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }],::Vertx::Util::Utils.j_class_of(keyType),::Vertx::Util::Utils.j_class_of(valueType)),::VertxKafkaClient::KafkaConsumer, ::Vertx::Util::Utils.v_type_of(keyType), ::Vertx::Util::Utils.v_type_of(valueType))
  end
  raise ArgumentError, "Invalid arguments when calling create(#{vertx},#{config},#{keyType},#{valueType})"
end

Instance Method Details

- (self) assign(topicPartition) - (self) assign(topicPartitions) - (self) assign(topicPartition, completionHandler) { ... } - (self) assign(topicPartitions, completionHandler) { ... }

Manually assign a list of partition to this consumer.

Overloads:

  • - (self) assign(topicPartition)

    Parameters:

    • topicPartition (Hash)
      partition which want assigned
  • - (self) assign(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      partitions which want assigned
  • - (self) assign(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      partition which want assigned

    Yields:

    • handler called on operation completed
  • - (self) assign(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      partitions which want assigned

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 175

def assign(param_1=nil)
  if param_1.class == Hash && !block_given?
    @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling assign(#{param_1})"
end

- (self) assignment { ... }

Get the set of partitions currently assigned to this consumer.

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


194
195
196
197
198
199
200
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 194

def assignment
  if block_given?
    @j_del.java_method(:assignment, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling assignment()"
end

- (void) close { ... }

This method returns an undefined value.

Close the consumer

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


359
360
361
362
363
364
365
366
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 359

def close
  if !block_given?
    return @j_del.java_method(:close, []).call()
  elsif block_given?
    return @j_del.java_method(:close, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling close()"
end

- (void) commit { ... }

This method returns an undefined value.

Commit current offsets for all the subscribed list of topics and partition.

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


327
328
329
330
331
332
333
334
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 327

def commit
  if !block_given?
    return @j_del.java_method(:commit, []).call()
  elsif block_given?
    return @j_del.java_method(:commit, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling commit()"
end

- (void) committed(topicPartition = nil) { ... }

This method returns an undefined value.

Get the last committed offset for the given partition (whether the commit happened by this process or another).

Parameters:

  • topicPartition (Hash) (defaults to: nil)
    topic partition for getting last committed offset

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


339
340
341
342
343
344
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 339

def committed(topicPartition=nil)
  if topicPartition.class == Hash && block_given?
    return @j_del.java_method(:committed, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling committed(#{topicPartition})"
end

- (self) end_handler { ... }

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)


128
129
130
131
132
133
134
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 128

def end_handler
  if block_given?
    @j_del.java_method(:endHandler, [Java::IoVertxCore::Handler.java_class]).call(Proc.new { yield })
    return self
  end
  raise ArgumentError, "Invalid arguments when calling end_handler()"
end

- (self) exception_handler { ... }

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)


46
47
48
49
50
51
52
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 46

def exception_handler
  if block_given?
    @j_del.java_method(:exceptionHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.from_throwable(event)) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling exception_handler()"
end

- (self) handler { ... }

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)


55
56
57
58
59
60
61
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 55

def handler
  if block_given?
    @j_del.java_method(:handler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecord, nil, nil)) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling handler()"
end

- (self) partitions_assigned_handler { ... }

Set the handler called when topic partitions are assigned to the consumer

Yields:

  • handler called on assigned topic partitions

Returns:

  • (self)

Raises:

  • (ArgumentError)


246
247
248
249
250
251
252
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 246

def partitions_assigned_handler
  if block_given?
    @j_del.java_method(:partitionsAssignedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling partitions_assigned_handler()"
end

- (self) partitions_for(topic = nil) { ... }

Get metadata about the partitions for a given topic.

Parameters:

  • topic (String) (defaults to: nil)
    topic partition for which getting partitions info

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


349
350
351
352
353
354
355
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 349

def partitions_for(topic=nil)
  if topic.class == String && block_given?
    @j_del.java_method(:partitionsFor, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(topic,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result.to_a.map { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling partitions_for(#{topic})"
end

- (self) partitions_revoked_handler { ... }

Set the handler called when topic partitions are revoked to the consumer

Yields:

  • handler called on revoked topic partitions

Returns:

  • (self)

Raises:

  • (ArgumentError)


236
237
238
239
240
241
242
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 236

def partitions_revoked_handler
  if block_given?
    @j_del.java_method(:partitionsRevokedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling partitions_revoked_handler()"
end

- (self) pause - (self) pause(topicPartition) - (self) pause(topicPartitions) - (self) pause(topicPartition, completionHandler) { ... } - (self) pause(topicPartitions, completionHandler) { ... }

Suspend fetching from the requested partitions.

Overloads:

  • - (self) pause(topicPartition)

    Parameters:

    • topicPartition (Hash)
      topic partition from which suspend fetching
  • - (self) pause(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition from which suspend fetching
  • - (self) pause(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      topic partition from which suspend fetching

    Yields:

    • handler called on operation completed
  • - (self) pause(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition from which suspend fetching

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 75

def pause(param_1=nil)
  if !block_given? && param_1 == nil
    @j_del.java_method(:pause, []).call()
    return self
  elsif param_1.class == Hash && !block_given?
    @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling pause(#{param_1})"
end

- (void) paused { ... }

This method returns an undefined value.

Get the set of partitions that were previously paused by a call to pause(Set).

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


227
228
229
230
231
232
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 227

def paused
  if block_given?
    return @j_del.java_method(:paused, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling paused()"
end

- (void) position(partition = nil) { ... }

This method returns an undefined value.

Get the offset of the next record that will be fetched (if a record with that offset exists).

Parameters:

  • partition (Hash) (defaults to: nil)
    The partition to get the position for

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


371
372
373
374
375
376
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 371

def position(partition=nil)
  if partition.class == Hash && block_given?
    return @j_del.java_method(:position, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(partition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling position(#{partition})"
end

- (self) resume - (self) resume(topicPartition) - (self) resume(topicPartitions) - (self) resume(topicPartition, completionHandler) { ... } - (self) resume(topicPartitions, completionHandler) { ... }

Resume specified partitions which have been paused with pause.

Overloads:

  • - (self) resume(topicPartition)

    Parameters:

    • topicPartition (Hash)
      topic partition from which resume fetching
  • - (self) resume(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition from which resume fetching
  • - (self) resume(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      topic partition from which resume fetching

    Yields:

    • handler called on operation completed
  • - (self) resume(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition from which resume fetching

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 107

def resume(param_1=nil)
  if !block_given? && param_1 == nil
    @j_del.java_method(:resume, []).call()
    return self
  elsif param_1.class == Hash && !block_given?
    @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling resume(#{param_1})"
end

- (self) seek(topicPartition = nil, offset = nil) { ... }

Overrides the fetch offsets that the consumer will use on the next poll.

Parameters:

  • topicPartition (Hash) (defaults to: nil)
    topic partition for which seek
  • offset (Fixnum) (defaults to: nil)
    offset to seek inside the topic partition

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


258
259
260
261
262
263
264
265
266
267
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 258

def seek(topicPartition=nil,offset=nil)
  if topicPartition.class == Hash && offset.class == Fixnum && !block_given?
    @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset)
    return self
  elsif topicPartition.class == Hash && offset.class == Fixnum && block_given?
    @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling seek(#{topicPartition},#{offset})"
end

- (self) seekToBeginning(topicPartition) - (self) seekToBeginning(topicPartitions) - (self) seekToBeginning(topicPartition, completionHandler) { ... } - (self) seekToBeginning(topicPartitions, completionHandler) { ... }

Seek to the first offset for each of the given partitions.

Overloads:

  • - (self) seekToBeginning(topicPartition)

    Parameters:

    • topicPartition (Hash)
      topic partition for which seek
  • - (self) seekToBeginning(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition for which seek
  • - (self) seekToBeginning(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      topic partition for which seek

    Yields:

    • handler called on operation completed
  • - (self) seekToBeginning(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition for which seek

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 280

def seek_to_beginning(param_1=nil)
  if param_1.class == Hash && !block_given?
    @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling seek_to_beginning(#{param_1})"
end

- (self) seekToEnd(topicPartition) - (self) seekToEnd(topicPartitions) - (self) seekToEnd(topicPartition, completionHandler) { ... } - (self) seekToEnd(topicPartitions, completionHandler) { ... }

Seek to the last offset for each of the given partitions.

Overloads:

  • - (self) seekToEnd(topicPartition)

    Parameters:

    • topicPartition (Hash)
      topic partition for which seek
  • - (self) seekToEnd(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition for which seek
  • - (self) seekToEnd(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      topic partition for which seek

    Yields:

    • handler called on operation completed
  • - (self) seekToEnd(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition for which seek

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 308

def seek_to_end(param_1=nil)
  if param_1.class == Hash && !block_given?
    @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling seek_to_end(#{param_1})"
end

- (self) subscribe(topic) - (self) subscribe(topics) - (self) subscribe(topic, completionHandler) { ... } - (self) subscribe(topics, completionHandler) { ... }

Subscribe to the given list of topics to get dynamically assigned partitions.

Overloads:

  • - (self) subscribe(topic)

    Parameters:

    • topic (String)
      topic to subscribe to
  • - (self) subscribe(topics)

    Parameters:

    • topics (Set<String>)
      topics to subscribe to
  • - (self) subscribe(topic, completionHandler) { ... }

    Parameters:

    • topic (String)
      topic to subscribe to

    Yields:

    • handler called on operation completed
  • - (self) subscribe(topics, completionHandler) { ... }

    Parameters:

    • topics (Set<String>)
      topics to subscribe to

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 147

def subscribe(param_1=nil)
  if param_1.class == String && !block_given?
    @j_del.java_method(:subscribe, [Java::java.lang.String.java_class]).call(param_1)
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element }))
    return self
  elsif param_1.class == String && block_given?
    @j_del.java_method(:subscribe, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(param_1,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling subscribe(#{param_1})"
end

- (self) subscription { ... }

Get the current subscription.

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


217
218
219
220
221
222
223
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 217

def subscription
  if block_given?
    @j_del.java_method(:subscription, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt } : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling subscription()"
end

- (self) unsubscribe { ... }

Unsubscribe from topics currently subscribed with subscribe.

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


204
205
206
207
208
209
210
211
212
213
# File '/Users/julien/java/vertx-stack/stack-docs/target/rb/vertx-kafka-client/kafka_consumer.rb', line 204

def unsubscribe
  if !block_given?
    @j_del.java_method(:unsubscribe, []).call()
    return self
  elsif block_given?
    @j_del.java_method(:unsubscribe, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling unsubscribe()"
end