package org.apache.camel.component.vertx.kafka.operations;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration;
import org.apache.camel.component.vertx.kafka.operations.TopicSubscription;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.function.TriConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

/* loaded from: input_file:org/apache/camel/component/vertx/kafka/operations/VertxKafkaConsumerOperations.class */
public class VertxKafkaConsumerOperations {
    private static final Logger LOG = LoggerFactory.getLogger(VertxKafkaConsumerOperations.class);
    private final KafkaConsumer<Object, Object> kafkaConsumer;
    private final VertxKafkaConfiguration configuration;

    public VertxKafkaConsumerOperations(KafkaConsumer<Object, Object> kafkaConsumer, VertxKafkaConfiguration vertxKafkaConfiguration) {
        this.kafkaConsumer = kafkaConsumer;
        this.configuration = vertxKafkaConfiguration;
    }

    public void receiveEvents(Consumer<KafkaConsumerRecord<Object, Object>> consumer, Consumer<Throwable> consumer2) {
        if (ObjectHelper.isEmpty(this.configuration.getTopic())) {
            throw new IllegalArgumentException("Topic or list of topics need to be set in the topic config.");
        }
        TopicSubscription topicSubscription = new TopicSubscription(this.configuration.getTopic(), this.configuration.getPartitionId(), this.configuration.getSeekToOffset(), this.configuration.getSeekToPosition());
        KafkaConsumer<Object, Object> kafkaConsumer = this.kafkaConsumer;
        consumer.getClass();
        kafkaConsumer.handler((v1) -> {
            r1.accept(v1);
        });
        KafkaConsumer<Object, Object> kafkaConsumer2 = this.kafkaConsumer;
        consumer2.getClass();
        kafkaConsumer2.exceptionHandler((v1) -> {
            r1.accept(v1);
        });
        if (ObjectHelper.isEmpty(topicSubscription.getPartitionId())) {
            subscribe(topicSubscription, consumer2);
        } else {
            assign(topicSubscription, consumer2);
        }
    }

    private void subscribe(TopicSubscription topicSubscription, Consumer<Throwable> consumer) {
        LOG.info("Subscribing to {} topics", topicSubscription.getConfiguredTopicName());
        seekOnPartitionAssignment(topicSubscription, consumer);
        subscribeToTopics(topicSubscription.getTopics()).subscribe(r1 -> {
        }, consumer, () -> {
        });
    }

    private void seekOnPartitionAssignment(TopicSubscription topicSubscription, Consumer<Throwable> consumer) {
        if (isSeekToSet(topicSubscription)) {
            getTopicPartitionsOnPartitionAssignment().flatMap(topicPartition -> {
                return seekToOffsetOrPositionInPartition(topicPartition, topicSubscription);
            }).subscribe(r1 -> {
            }, consumer, () -> {
                LOG.info("Seeking partitions is done.");
            });
        }
    }

    private boolean isSeekToSet(TopicSubscription topicSubscription) {
        return ObjectHelper.isNotEmpty(topicSubscription.getSeekToOffset()) || ObjectHelper.isNotEmpty(topicSubscription.getSeekToPosition());
    }

    private Flux<TopicPartition> getTopicPartitionsOnPartitionAssignment() {
        return Flux.create(fluxSink -> {
            this.kafkaConsumer.partitionsAssignedHandler(set -> {
                LOG.info("Partition {} is assigned to consumer", set);
                set.forEach(topicPartition -> {
                    LOG.info("Partition {} is assigned to consumer for topic {}", Integer.valueOf(topicPartition.getPartition()), topicPartition.getTopic());
                    fluxSink.next(topicPartition);
                });
                fluxSink.complete();
            });
        });
    }

    private Mono<Void> seekToOffsetOrPositionInPartition(TopicPartition topicPartition, TopicSubscription topicSubscription) {
        if (!ObjectHelper.isNotEmpty(topicSubscription.getSeekToOffset())) {
            return seekToPosition(topicPartition, topicSubscription.getSeekToPosition());
        }
        LOG.info("Seeking topic {} with partition {} to offset {}.", new Object[]{topicPartition.getTopic(), Integer.valueOf(topicPartition.getPartition()), topicSubscription.getSeekToOffset()});
        KafkaConsumer<Object, Object> kafkaConsumer = this.kafkaConsumer;
        kafkaConsumer.getClass();
        return wrapToMono((v1, v2, v3) -> {
            r1.seek(v1, v2, v3);
        }, topicPartition, topicSubscription.getSeekToOffset());
    }

    private Mono<Void> seekToPosition(TopicPartition topicPartition, TopicSubscription.OffsetPosition offsetPosition) {
        switch (offsetPosition) {
            case BEGINNING:
                LOG.info("Seeking topic {} with partition {} to the beginning.", topicPartition.getTopic(), Integer.valueOf(topicPartition.getPartition()));
                KafkaConsumer<Object, Object> kafkaConsumer = this.kafkaConsumer;
                kafkaConsumer.getClass();
                return wrapToMono(kafkaConsumer::seekToBeginning, topicPartition);
            case END:
                LOG.info("Seeking topic {} with partition {} to the end.", topicPartition.getTopic(), Integer.valueOf(topicPartition.getPartition()));
                KafkaConsumer<Object, Object> kafkaConsumer2 = this.kafkaConsumer;
                kafkaConsumer2.getClass();
                return wrapToMono(kafkaConsumer2::seekToEnd, topicPartition);
            default:
                LOG.warn("No valid positions being set, hence the seeking operation will be ignored.");
                return Mono.empty();
        }
    }

    private Mono<Void> subscribeToTopics(Set<String> set) {
        KafkaConsumer<Object, Object> kafkaConsumer = this.kafkaConsumer;
        kafkaConsumer.getClass();
        return wrapToMono(kafkaConsumer::subscribe, set);
    }

    private void assign(TopicSubscription topicSubscription, Consumer<Throwable> consumer) {
        LOG.info("Assigning topics {} to partition {}", topicSubscription.getConfiguredTopicName(), topicSubscription.getPartitionId());
        assignToPartitions(topicSubscription.getTopicPartitions()).then(seekPartitionsManually(topicSubscription)).subscribe(r1 -> {
        }, consumer, () -> {
        });
    }

    private Mono<Void> seekPartitionsManually(TopicSubscription topicSubscription) {
        return isSeekToSet(topicSubscription) ? Flux.fromIterable(topicSubscription.getTopicPartitions()).flatMap(topicPartition -> {
            return seekToOffsetOrPositionInPartition(topicPartition, topicSubscription);
        }).doOnComplete(() -> {
            LOG.info("Seeking partitions is done.");
        }).then() : Mono.empty();
    }

    private Mono<Void> assignToPartitions(Set<TopicPartition> set) {
        KafkaConsumer<Object, Object> kafkaConsumer = this.kafkaConsumer;
        kafkaConsumer.getClass();
        return wrapToMono(kafkaConsumer::assign, set);
    }

    private <R, V> Mono<R> wrapToMono(BiConsumer<V, Handler<AsyncResult<R>>> biConsumer, V v) {
        return Mono.create(monoSink -> {
            biConsumer.accept(v, asyncResult -> {
                wrapAsyncResult(monoSink, asyncResult);
            });
        });
    }

    private <R, V1, V2> Mono<R> wrapToMono(TriConsumer<V1, V2, Handler<AsyncResult<R>>> triConsumer, V1 v1, V2 v2) {
        return Mono.create(monoSink -> {
            triConsumer.accept(v1, v2, asyncResult -> {
                wrapAsyncResult(monoSink, asyncResult);
            });
        });
    }

    private <R> void wrapAsyncResult(MonoSink<R> monoSink, AsyncResult<R> asyncResult) {
        if (asyncResult.failed()) {
            monoSink.error(asyncResult.cause());
        } else {
            monoSink.success(asyncResult.result());
        }
    }
}
