vertx / io.vertx.kafka.client.consumer / KafkaConsumer

KafkaConsumer

interface KafkaConsumer<K : Any, V : Any> : ReadStream<KafkaConsumerRecord<K, V>>

Vert.x Kafka consumer.

You receive Kafka records by providing a KafkaConsumer#handler(Handler). As messages arrive the handler will be called with the records.

The #pause() and #resume() provides global control over reading the records from the consumer.

The #pause(Set) and #resume(Set) provides finer grained control over reading records for specific Topic/Partition, these are Kafka's specific operations.

Functions

asStream

abstract fun asStream(): KafkaReadStream<K, V>

assign

abstract fun assign(topicPartition: TopicPartition): KafkaConsumer<K, V>
abstract fun assign(topicPartition: TopicPartition, completionHandler: Handler<AsyncResult<Void>>): KafkaConsumer<K, V>

Manually assign a partition to this consumer.

abstract fun assign(topicPartitions: MutableSet<TopicPartition>): KafkaConsumer<K, V>
abstract fun assign(topicPartitions: MutableSet<TopicPartition>, completionHandler: Handler<AsyncResult<Void>>): KafkaConsumer<K, V>

Manually assign a list of partition to this consumer.

assignment

abstract fun assignment(handler: Handler<AsyncResult<MutableSet<TopicPartition>>>): KafkaConsumer<K, V>

Get the set of partitions currently assigned to this consumer.

batchHandler

abstract fun batchHandler(handler: Handler<KafkaConsumerRecords<K, V>>): KafkaConsumer<K, V>

Set the handler to be used when batches of messages are fetched from the Kafka server. Batch handlers need to take care not to block the event loop when dealing with large batches. It is better to process records individually using the record handler.

beginningOffsets

abstract fun beginningOffsets(topicPartitions: MutableSet<TopicPartition>, handler: Handler<AsyncResult<MutableMap<TopicPartition, Long>>>): Unit
abstract fun beginningOffsets(topicPartition: TopicPartition, handler: Handler<AsyncResult<Long>>): Unit

Get the first offset for the given partitions.

close

open fun close(): Unit
abstract fun close(completionHandler: Handler<AsyncResult<Void>>): Unit

Close the consumer

commit

abstract fun commit(): Unit
abstract fun commit(completionHandler: Handler<AsyncResult<Void>>): Unit

Commit current offsets for all the subscribed list of topics and partition.

abstract fun commit(offsets: MutableMap<TopicPartition, OffsetAndMetadata>): Unit
abstract fun commit(offsets: MutableMap<TopicPartition, OffsetAndMetadata>, completionHandler: Handler<AsyncResult<MutableMap<TopicPartition, OffsetAndMetadata>>>): Unit

Commit the specified offsets for the specified list of topics and partitions to Kafka.

committed

abstract fun committed(topicPartition: TopicPartition, handler: Handler<AsyncResult<OffsetAndMetadata>>): Unit

Get the last committed offset for the given partition (whether the commit happened by this process or another).

create

open static fun <K : Any, V : Any> create(vertx: Vertx, consumer: Consumer<K, V>): KafkaConsumer<K, V>

Create a new KafkaConsumer instance from a native Consumer.

open static fun <K : Any, V : Any> create(vertx: Vertx, config: MutableMap<String, String>): KafkaConsumer<K, V>
open static fun <K : Any, V : Any> create(vertx: Vertx, config: MutableMap<String, String>, keyType: Class<K>, valueType: Class<V>): KafkaConsumer<K, V>
open static fun <K : Any, V : Any> create(vertx: Vertx, config: Properties): KafkaConsumer<K, V>
open static fun <K : Any, V : Any> create(vertx: Vertx, config: Properties, keyType: Class<K>, valueType: Class<V>): KafkaConsumer<K, V>

Create a new KafkaConsumer instance

endHandler

abstract fun endHandler(endHandler: Handler<Void>): KafkaConsumer<K, V>

endOffsets

abstract fun endOffsets(topicPartitions: MutableSet<TopicPartition>, handler: Handler<AsyncResult<MutableMap<TopicPartition, Long>>>): Unit

Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.

abstract fun endOffsets(topicPartition: TopicPartition, handler: Handler<AsyncResult<Long>>): Unit

Get the last offset for the given partition. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.

exceptionHandler

abstract fun exceptionHandler(handler: Handler<Throwable>): KafkaConsumer<K, V>

handler

abstract fun handler(handler: Handler<KafkaConsumerRecord<K, V>>): KafkaConsumer<K, V>

listTopics

abstract fun listTopics(handler: Handler<AsyncResult<MutableMap<String, MutableList<PartitionInfo>>>>): KafkaConsumer<K, V>

Get metadata about partitions for all topics that the user is authorized to view.

offsetsForTimes

abstract fun offsetsForTimes(topicPartitionTimestamps: MutableMap<TopicPartition, Long>, handler: Handler<AsyncResult<MutableMap<TopicPartition, OffsetAndTimestamp>>>): Unit

Look up the offsets for the given partitions by timestamp. Note: the result might be empty in case for the given timestamp no offset can be found -- e.g., when the timestamp refers to the future

abstract fun offsetsForTimes(topicPartition: TopicPartition, timestamp: Long, handler: Handler<AsyncResult<OffsetAndTimestamp>>): Unit

Look up the offset for the given partition by timestamp. Note: the result might be null in case for the given timestamp no offset can be found -- e.g., when the timestamp refers to the future

partitionsAssignedHandler

abstract fun partitionsAssignedHandler(handler: Handler<MutableSet<TopicPartition>>): KafkaConsumer<K, V>

Set the handler called when topic partitions are assigned to the consumer

partitionsFor

abstract fun partitionsFor(topic: String, handler: Handler<AsyncResult<MutableList<PartitionInfo>>>): KafkaConsumer<K, V>

Get metadata about the partitions for a given topic.

partitionsRevokedHandler

abstract fun partitionsRevokedHandler(handler: Handler<MutableSet<TopicPartition>>): KafkaConsumer<K, V>

Set the handler called when topic partitions are revoked to the consumer

pause

abstract fun pause(): KafkaConsumer<K, V>abstract fun pause(topicPartition: TopicPartition): KafkaConsumer<K, V>
abstract fun pause(topicPartition: TopicPartition, completionHandler: Handler<AsyncResult<Void>>): KafkaConsumer<K, V>

Suspend fetching from the requested partition.

abstract fun pause(topicPartitions: MutableSet<TopicPartition>): KafkaConsumer<K, V>
abstract fun pause(topicPartitions: MutableSet<TopicPartition>, completionHandler: Handler<AsyncResult<Void>>): KafkaConsumer<K, V>

Suspend fetching from the requested partitions.

paused

abstract fun paused(handler: Handler<AsyncResult<MutableSet<TopicPartition>>>): Unit

Get the set of partitions that were previously paused by a call to pause(Set).

position

abstract fun position(partition: TopicPartition, handler: Handler<AsyncResult<Long>>): Unit

Get the offset of the next record that will be fetched (if a record with that offset exists).

resume

abstract fun resume(): KafkaConsumer<K, V>abstract fun resume(topicPartition: TopicPartition): KafkaConsumer<K, V>
abstract fun resume(topicPartition: TopicPartition, completionHandler: Handler<AsyncResult<Void>>): KafkaConsumer<K, V>

Resume specified partition which have been paused with pause.

abstract fun resume(topicPartitions: MutableSet<TopicPartition>): KafkaConsumer<K, V>
abstract fun resume(topicPartitions: MutableSet<TopicPartition>, completionHandler: Handler<AsyncResult<Void>>): KafkaConsumer<K, V>

Resume specified partitions which have been paused with pause.

seek

abstract fun seek(topicPartition: TopicPartition, offset: Long): KafkaConsumer<K, V>
abstract fun seek(topicPartition: TopicPartition, offset: Long, completionHandler: Handler<AsyncResult<Void>>): KafkaConsumer<K, V>

Overrides the fetch offsets that the consumer will use on the next poll.

seekToBeginning

abstract fun seekToBeginning(topicPartition: TopicPartition): KafkaConsumer<K, V>
abstract fun seekToBeginning(topicPartition: TopicPartition, completionHandler: Handler<AsyncResult<Void>>): KafkaConsumer<K, V>

Seek to the first offset for each of the given partition.

abstract fun seekToBeginning(topicPartitions: MutableSet<TopicPartition>): KafkaConsumer<K, V>
abstract fun seekToBeginning(topicPartitions: MutableSet<TopicPartition>, completionHandler: Handler<AsyncResult<Void>>): KafkaConsumer<K, V>

Seek to the first offset for each of the given partitions.

seekToEnd

abstract fun seekToEnd(topicPartition: TopicPartition): KafkaConsumer<K, V>
abstract fun seekToEnd(topicPartition: TopicPartition, completionHandler: Handler<AsyncResult<Void>>): KafkaConsumer<K, V>

Seek to the last offset for each of the given partition.

abstract fun seekToEnd(topicPartitions: MutableSet<TopicPartition>): KafkaConsumer<K, V>
abstract fun seekToEnd(topicPartitions: MutableSet<TopicPartition>, completionHandler: Handler<AsyncResult<Void>>): KafkaConsumer<K, V>

Seek to the last offset for each of the given partitions.

subscribe

abstract fun subscribe(topic: String): KafkaConsumer<K, V>
abstract fun subscribe(topic: String, completionHandler: Handler<AsyncResult<Void>>): KafkaConsumer<K, V>

Subscribe to the given topic to get dynamically assigned partitions.

abstract fun subscribe(topics: MutableSet<String>): KafkaConsumer<K, V>
abstract fun subscribe(topics: MutableSet<String>, completionHandler: Handler<AsyncResult<Void>>): KafkaConsumer<K, V>

Subscribe to the given list of topics to get dynamically assigned partitions.

subscription

abstract fun subscription(handler: Handler<AsyncResult<MutableSet<String>>>): KafkaConsumer<K, V>

Get the current subscription.

unsubscribe

abstract fun unsubscribe(): KafkaConsumer<K, V>
abstract fun unsubscribe(completionHandler: Handler<AsyncResult<Void>>): KafkaConsumer<K, V>

Unsubscribe from topics currently subscribed with subscribe.

unwrap

abstract fun unwrap(): Consumer<K, V>