Class: VertxKafkaClient::KafkaConsumer

Inherits:
Object
  • Object
show all
Includes:
Vertx::ReadStream
Defined in:
/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb

Overview

Vert.x Kafka consumer.

You receive Kafka records by providing a #handler. As messages arrive the handler will be called with the records.

The #pause and #resume provides global control over reading the records from the consumer.

The #pause and #resume provides finer grained control over reading records for specific Topic/Partition, these are Kafka's specific operations.

Class Method Summary (collapse)

Instance Method Summary (collapse)

Class Method Details

+ (::VertxKafkaClient::KafkaConsumer) create(vertx = nil, config = nil, keyType = nil, valueType = nil)

Create a new KafkaConsumer instance

Parameters:

  • vertx (::Vertx::Vertx) (defaults to: nil)
    Vert.x instance to use
  • config (Hash{String => String}) (defaults to: nil)
    Kafka consumer configuration
  • keyType (Nil) (defaults to: nil)
    class type for the key deserialization
  • valueType (Nil) (defaults to: nil)
    class type for the value deserialization

Returns:

Raises:

  • (ArgumentError)


77
78
79
80
81
82
83
84
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 77

def self.create(vertx=nil,config=nil,keyType=nil,valueType=nil)
  if vertx.class.method_defined?(:j_del) && config.class == Hash && !block_given? && keyType == nil && valueType == nil
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }]),::VertxKafkaClient::KafkaConsumer, nil, nil)
  elsif vertx.class.method_defined?(:j_del) && config.class == Hash && keyType.class == Class && valueType.class == Class && !block_given?
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class,Java::JavaLang::Class.java_class,Java::JavaLang::Class.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }],::Vertx::Util::Utils.j_class_of(keyType),::Vertx::Util::Utils.j_class_of(valueType)),::VertxKafkaClient::KafkaConsumer, ::Vertx::Util::Utils.v_type_of(keyType), ::Vertx::Util::Utils.v_type_of(valueType))
  end
  raise ArgumentError, "Invalid arguments when calling create(#{vertx},#{config},#{keyType},#{valueType})"
end

Instance Method Details

- (self) assign(topicPartition) - (self) assign(topicPartitions) - (self) assign(topicPartition, completionHandler) { ... } - (self) assign(topicPartitions, completionHandler) { ... }

Manually assign a list of partition to this consumer.

Due to internal buffering of messages, when reassigning the old set of partitions may remain in effect (as observed by the record handler)} until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new set of partitions.

Overloads:

  • - (self) assign(topicPartition)

    Parameters:

    • topicPartition (Hash{String => Object})
      partition which want assigned
  • - (self) assign(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash{String => Object}>)
      partitions which want assigned
  • - (self) assign(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash{String => Object})
      partition which want assigned

    Yields:

    • handler called on operation completed
  • - (self) assign(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash{String => Object}>)
      partitions which want assigned

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 240

def assign(param_1=nil)
  if param_1.class == Hash && !block_given?
    @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && true
    @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) } unless !block_given?))
    return self
  elsif param_1.class == Set && true
    @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) } unless !block_given?))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling assign(#{param_1})"
end

- (self) assignment { ... }

Get the set of partitions currently assigned to this consumer.

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


259
260
261
262
263
264
265
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 259

def assignment
  if true
    @j_del.java_method(:assignment, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) } unless !block_given?))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling assignment()"
end

- (self) batch_handler { ... }

Set the handler to be used when batches of messages are fetched from the Kafka server. Batch handlers need to take care not to block the event loop when dealing with large batches. It is better to process records individually using the record handler.

Yields:

  • handler called when batches of messages are fetched

Returns:

  • (self)

Raises:

  • (ArgumentError)


451
452
453
454
455
456
457
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 451

def batch_handler
  if true
    @j_del.java_method(:batchHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecords, nil, nil)) unless !block_given? }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling batch_handler()"
end

- (void) beginning_offsets(topicPartition = nil) { ... }

This method returns an undefined value.

Get the first offset for the given partitions.

Parameters:

  • topicPartition (Hash{String => Object}) (defaults to: nil)
    the partition to get the earliest offset.

Yields:

  • handler called on operation completed. Returns the earliest available offset for the given partition

Raises:

  • (ArgumentError)


495
496
497
498
499
500
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 495

def beginning_offsets(topicPartition=nil)
  if topicPartition.class == Hash && true
    return @j_del.java_method(:beginningOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) } unless !block_given?))
  end
  raise ArgumentError, "Invalid arguments when calling beginning_offsets(#{topicPartition})"
end

- (void) close { ... }

This method returns an undefined value.

Close the consumer

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


461
462
463
464
465
466
467
468
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 461

def close
  if !block_given?
    return @j_del.java_method(:close, []).call()
  elsif true
    return @j_del.java_method(:close, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) } unless !block_given?))
  end
  raise ArgumentError, "Invalid arguments when calling close()"
end

- (void) commit { ... }

This method returns an undefined value.

Commit current offsets for all the subscribed list of topics and partition.

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


416
417
418
419
420
421
422
423
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 416

def commit
  if !block_given?
    return @j_del.java_method(:commit, []).call()
  elsif true
    return @j_del.java_method(:commit, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) } unless !block_given?))
  end
  raise ArgumentError, "Invalid arguments when calling commit()"
end

- (void) committed(topicPartition = nil) { ... }

This method returns an undefined value.

Get the last committed offset for the given partition (whether the commit happened by this process or another).

Parameters:

  • topicPartition (Hash{String => Object}) (defaults to: nil)
    topic partition for getting last committed offset

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


428
429
430
431
432
433
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 428

def committed(topicPartition=nil)
  if topicPartition.class == Hash && true
    return @j_del.java_method(:committed, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) } unless !block_given?))
  end
  raise ArgumentError, "Invalid arguments when calling committed(#{topicPartition})"
end

- (self) end_handler { ... }

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)


177
178
179
180
181
182
183
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 177

def end_handler
  if true
    @j_del.java_method(:endHandler, [Java::IoVertxCore::Handler.java_class]).call(Proc.new { yield unless !block_given? })
    return self
  end
  raise ArgumentError, "Invalid arguments when calling end_handler()"
end

- (void) end_offsets(topicPartition = nil) { ... }

This method returns an undefined value.

Get the last offset for the given partition. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.

Parameters:

  • topicPartition (Hash{String => Object}) (defaults to: nil)
    the partition to get the end offset.

Yields:

  • handler called on operation completed. The end offset for the given partition.

Raises:

  • (ArgumentError)


506
507
508
509
510
511
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 506

def end_offsets(topicPartition=nil)
  if topicPartition.class == Hash && true
    return @j_del.java_method(:endOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) } unless !block_given?))
  end
  raise ArgumentError, "Invalid arguments when calling end_offsets(#{topicPartition})"
end

- (self) exception_handler { ... }

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)


87
88
89
90
91
92
93
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 87

def exception_handler
  if true
    @j_del.java_method(:exceptionHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.from_throwable(event)) unless !block_given? }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling exception_handler()"
end

- (self) fetch(amount = nil)

Fetch the specified amount of elements. If the ReadStream has been paused, reading will recommence with the specified amount of items, otherwise the specified amount will be added to the current stream demand.

Parameters:

  • amount (Fixnum) (defaults to: nil)

Returns:

  • (self)

Raises:

  • (ArgumentError)


39
40
41
42
43
44
45
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 39

def fetch(amount=nil)
  if amount.class == Fixnum && !block_given?
    @j_del.java_method(:fetch, [Java::long.java_class]).call(amount)
    return self
  end
  raise ArgumentError, "Invalid arguments when calling fetch(#{amount})"
end

- (self) handler { ... }

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)


96
97
98
99
100
101
102
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 96

def handler
  if true
    @j_del.java_method(:handler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecord, nil, nil)) unless !block_given? }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling handler()"
end

- (void) offsets_for_times(topicPartition = nil, timestamp = nil) { ... }

This method returns an undefined value.

Look up the offset for the given partition by timestamp. Note: the result might be null in case for the given timestamp no offset can be found -- e.g., when the timestamp refers to the future

Parameters:

  • topicPartition (Hash{String => Object}) (defaults to: nil)
    TopicPartition to query.
  • timestamp (Fixnum) (defaults to: nil)
    Timestamp to be used in the query.

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


485
486
487
488
489
490
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 485

def offsets_for_times(topicPartition=nil,timestamp=nil)
  if topicPartition.class == Hash && timestamp.class == Fixnum && true
    return @j_del.java_method(:offsetsForTimes, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::JavaLang::Long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),timestamp,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) } unless !block_given?))
  end
  raise ArgumentError, "Invalid arguments when calling offsets_for_times(#{topicPartition},#{timestamp})"
end

- (self) partitions_assigned_handler { ... }

Set the handler called when topic partitions are assigned to the consumer

Yields:

  • handler called on assigned topic partitions

Returns:

  • (self)

Raises:

  • (ArgumentError)


311
312
313
314
315
316
317
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 311

def partitions_assigned_handler
  if true
    @j_del.java_method(:partitionsAssignedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) unless !block_given? }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling partitions_assigned_handler()"
end

- (self) partitions_for(topic = nil) { ... }

Get metadata about the partitions for a given topic.

Parameters:

  • topic (String) (defaults to: nil)
    topic partition for which getting partitions info

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


438
439
440
441
442
443
444
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 438

def partitions_for(topic=nil)
  if topic.class == String && true
    @j_del.java_method(:partitionsFor, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(topic,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result.to_a.map { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) } unless !block_given?))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling partitions_for(#{topic})"
end

- (self) partitions_revoked_handler { ... }

Set the handler called when topic partitions are revoked to the consumer

Yields:

  • handler called on revoked topic partitions

Returns:

  • (self)

Raises:

  • (ArgumentError)


301
302
303
304
305
306
307
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 301

def partitions_revoked_handler
  if true
    @j_del.java_method(:partitionsRevokedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) unless !block_given? }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling partitions_revoked_handler()"
end

- (self) pause - (self) pause(topicPartition) - (self) pause(topicPartitions) - (self) pause(topicPartition, completionHandler) { ... } - (self) pause(topicPartitions, completionHandler) { ... }

Suspend fetching from the requested partitions.

Due to internal buffering of messages, the will continue to observe messages from the given topicPartitions until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will not see messages from the given topicPartitions.

Overloads:

  • - (self) pause(topicPartition)

    Parameters:

    • topicPartition (Hash{String => Object})
      topic partition from which suspend fetching
  • - (self) pause(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash{String => Object}>)
      topic partition from which suspend fetching
  • - (self) pause(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash{String => Object})
      topic partition from which suspend fetching

    Yields:

    • handler called on operation completed
  • - (self) pause(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash{String => Object}>)
      topic partition from which suspend fetching

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 124

def pause(param_1=nil)
  if !block_given? && param_1 == nil
    @j_del.java_method(:pause, []).call()
    return self
  elsif param_1.class == Hash && !block_given?
    @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && true
    @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) } unless !block_given?))
    return self
  elsif param_1.class == Set && true
    @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) } unless !block_given?))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling pause(#{param_1})"
end

- (void) paused { ... }

This method returns an undefined value.

Get the set of partitions that were previously paused by a call to pause(Set).

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


292
293
294
295
296
297
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 292

def paused
  if true
    return @j_del.java_method(:paused, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) } unless !block_given?))
  end
  raise ArgumentError, "Invalid arguments when calling paused()"
end

- (::Vertx::Pipe) pipe

Pause this stream and return a to transfer the elements of this stream to a destination .

The stream will be resumed when the pipe will be wired to a WriteStream.

Returns:

Raises:

  • (ArgumentError)


50
51
52
53
54
55
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 50

def pipe
  if !block_given?
    return ::Vertx::Util::Utils.safe_create(@j_del.java_method(:pipe, []).call(),::Vertx::Pipe,::VertxKafkaClient::KafkaConsumerRecord.j_api_type)
  end
  raise ArgumentError, "Invalid arguments when calling pipe()"
end

- (void) pipe_to(dst = nil) { ... }

This method returns an undefined value.

Pipe this ReadStream to the WriteStream.

Elements emitted by this stream will be written to the write stream until this stream ends or fails.

Once this stream has ended or failed, the write stream will be ended and the handler will be called with the result.

Parameters:

Yields:

Raises:

  • (ArgumentError)


65
66
67
68
69
70
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 65

def pipe_to(dst=nil)
  if dst.class.method_defined?(:j_del) && true
    return @j_del.java_method(:pipeTo, [Java::IoVertxCoreStreams::WriteStream.java_class,Java::IoVertxCore::Handler.java_class]).call(dst.j_del,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) } unless !block_given?))
  end
  raise ArgumentError, "Invalid arguments when calling pipe_to(#{dst})"
end

- (void) poll(timeout = nil) { ... }

This method returns an undefined value.

Executes a poll for getting messages from Kafka

Parameters:

  • timeout (Fixnum) (defaults to: nil)
    The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the native Kafka consumer's buffer, else returns empty. Must not be negative.

Yields:

  • handler called after the poll with batch of records (can be empty).

Raises:

  • (ArgumentError)


528
529
530
531
532
533
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 528

def poll(timeout=nil)
  if timeout.class == Fixnum && true
    return @j_del.java_method(:poll, [Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(timeout,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.safe_create(ar.result,::VertxKafkaClient::KafkaConsumerRecords, nil, nil) : nil) } unless !block_given?))
  end
  raise ArgumentError, "Invalid arguments when calling poll(#{timeout})"
end

- (::VertxKafkaClient::KafkaConsumer) poll_timeout(timeout = nil)

Sets the poll timeout (in ms) for the underlying native Kafka Consumer. Defaults to 1000. Setting timeout to a lower value results in a more 'responsive' client, because it will block for a shorter period if no data is available in the assigned partition and therefore allows subsequent actions to be executed with a shorter delay. At the same time, the client will poll more frequently and thus will potentially create a higher load on the Kafka Broker.

Parameters:

  • timeout (Fixnum) (defaults to: nil)
    The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the native Kafka consumer's buffer, else returns empty. Must not be negative.

Returns:

Raises:

  • (ArgumentError)


518
519
520
521
522
523
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 518

def poll_timeout(timeout=nil)
  if timeout.class == Fixnum && !block_given?
    return ::Vertx::Util::Utils.safe_create(@j_del.java_method(:pollTimeout, [Java::long.java_class]).call(timeout),::VertxKafkaClient::KafkaConsumer, nil, nil)
  end
  raise ArgumentError, "Invalid arguments when calling poll_timeout(#{timeout})"
end

- (void) position(partition = nil) { ... }

This method returns an undefined value.

Get the offset of the next record that will be fetched (if a record with that offset exists).

Parameters:

  • partition (Hash{String => Object}) (defaults to: nil)
    The partition to get the position for

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


473
474
475
476
477
478
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 473

def position(partition=nil)
  if partition.class == Hash && true
    return @j_del.java_method(:position, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(partition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) } unless !block_given?))
  end
  raise ArgumentError, "Invalid arguments when calling position(#{partition})"
end

- (self) resume - (self) resume(topicPartition) - (self) resume(topicPartitions) - (self) resume(topicPartition, completionHandler) { ... } - (self) resume(topicPartitions, completionHandler) { ... }

Resume specified partitions which have been paused with pause.

Overloads:

  • - (self) resume(topicPartition)

    Parameters:

    • topicPartition (Hash{String => Object})
      topic partition from which resume fetching
  • - (self) resume(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash{String => Object}>)
      topic partition from which resume fetching
  • - (self) resume(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash{String => Object})
      topic partition from which resume fetching

    Yields:

    • handler called on operation completed
  • - (self) resume(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash{String => Object}>)
      topic partition from which resume fetching

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 156

def resume(param_1=nil)
  if !block_given? && param_1 == nil
    @j_del.java_method(:resume, []).call()
    return self
  elsif param_1.class == Hash && !block_given?
    @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && true
    @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) } unless !block_given?))
    return self
  elsif param_1.class == Set && true
    @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) } unless !block_given?))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling resume(#{param_1})"
end

- (self) seek(topicPartition = nil, offset = nil) { ... }

Overrides the fetch offsets that the consumer will use on the next poll.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new offset.

Parameters:

  • topicPartition (Hash{String => Object}) (defaults to: nil)
    topic partition for which seek
  • offset (Fixnum) (defaults to: nil)
    offset to seek inside the topic partition

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


331
332
333
334
335
336
337
338
339
340
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 331

def seek(topicPartition=nil,offset=nil)
  if topicPartition.class == Hash && offset.class == Fixnum && !block_given?
    @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset)
    return self
  elsif topicPartition.class == Hash && offset.class == Fixnum && true
    @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) } unless !block_given?))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling seek(#{topicPartition},#{offset})"
end

- (self) seekToBeginning(topicPartition) - (self) seekToBeginning(topicPartitions) - (self) seekToBeginning(topicPartition, completionHandler) { ... } - (self) seekToBeginning(topicPartitions, completionHandler) { ... }

Seek to the first offset for each of the given partitions.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new offset.

Overloads:

  • - (self) seekToBeginning(topicPartition)

    Parameters:

    • topicPartition (Hash{String => Object})
      topic partition for which seek
  • - (self) seekToBeginning(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash{String => Object}>)
      topic partition for which seek
  • - (self) seekToBeginning(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash{String => Object})
      topic partition for which seek

    Yields:

    • handler called on operation completed
  • - (self) seekToBeginning(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash{String => Object}>)
      topic partition for which seek

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 361

def seek_to_beginning(param_1=nil)
  if param_1.class == Hash && !block_given?
    @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && true
    @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) } unless !block_given?))
    return self
  elsif param_1.class == Set && true
    @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) } unless !block_given?))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling seek_to_beginning(#{param_1})"
end

- (self) seekToEnd(topicPartition) - (self) seekToEnd(topicPartitions) - (self) seekToEnd(topicPartition, completionHandler) { ... } - (self) seekToEnd(topicPartitions, completionHandler) { ... }

Seek to the last offset for each of the given partitions.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new offset.

Overloads:

  • - (self) seekToEnd(topicPartition)

    Parameters:

    • topicPartition (Hash{String => Object})
      topic partition for which seek
  • - (self) seekToEnd(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash{String => Object}>)
      topic partition for which seek
  • - (self) seekToEnd(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash{String => Object})
      topic partition for which seek

    Yields:

    • handler called on operation completed
  • - (self) seekToEnd(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash{String => Object}>)
      topic partition for which seek

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 397

def seek_to_end(param_1=nil)
  if param_1.class == Hash && !block_given?
    @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && true
    @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) } unless !block_given?))
    return self
  elsif param_1.class == Set && true
    @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) } unless !block_given?))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling seek_to_end(#{param_1})"
end

- (self) subscribe(topic) - (self) subscribe(topics) - (self) subscribe(topic, completionHandler) { ... } - (self) subscribe(topics, completionHandler) { ... }

Subscribe to the given list of topics to get dynamically assigned partitions.

Due to internal buffering of messages, when changing the subscribed topics the old set of topics may remain in effect (as observed by the record handler}) until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new set of topics.

Overloads:

  • - (self) subscribe(topic)

    Parameters:

    • topic (String)
      topic to subscribe to
  • - (self) subscribe(topics)

    Parameters:

    • topics (Set<String>)
      topics to subscribe to
  • - (self) subscribe(topic, completionHandler) { ... }

    Parameters:

    • topic (String)
      topic to subscribe to

    Yields:

    • handler called on operation completed
  • - (self) subscribe(topics, completionHandler) { ... }

    Parameters:

    • topics (Set<String>)
      topics to subscribe to

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 204

def subscribe(param_1=nil)
  if param_1.class == String && !block_given?
    @j_del.java_method(:subscribe, [Java::java.lang.String.java_class]).call(param_1)
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element }))
    return self
  elsif param_1.class == String && true
    @j_del.java_method(:subscribe, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(param_1,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) } unless !block_given?))
    return self
  elsif param_1.class == Set && true
    @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) } unless !block_given?))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling subscribe(#{param_1})"
end

- (self) subscription { ... }

Get the current subscription.

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


282
283
284
285
286
287
288
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 282

def subscription
  if true
    @j_del.java_method(:subscription, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt } : nil) } unless !block_given?))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling subscription()"
end

- (self) unsubscribe { ... }

Unsubscribe from topics currently subscribed with subscribe.

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


269
270
271
272
273
274
275
276
277
278
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 269

def unsubscribe
  if !block_given?
    @j_del.java_method(:unsubscribe, []).call()
    return self
  elsif true
    @j_del.java_method(:unsubscribe, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) } unless !block_given?))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling unsubscribe()"
end