/*
 * Decompiled with CFR 0.152.
 */
package de.id.quarkus.kafka.testing;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;

public class ConfluentStackClient {
    private final String kafkaBootstrapServers;
    private final String schemaRegistryUrl;

    ConfluentStackClient(String kafkaBootstrapServers, String schemaRegistryUrl) {
        this.kafkaBootstrapServers = kafkaBootstrapServers;
        this.schemaRegistryUrl = schemaRegistryUrl;
    }

    public String getKafkaBootstrapServers() {
        return this.kafkaBootstrapServers;
    }

    public String getSchemaRegistryUrl() {
        return this.schemaRegistryUrl;
    }

    public void deleteAllTopics() {
        AdminClient adminClient = this.createAdminClient();
        try {
            adminClient.listTopics().names().thenApply(arg_0 -> ((AdminClient)adminClient).deleteTopics(arg_0)).get();
        }
        catch (Exception e) {
            throw new RuntimeException("Error while deleting topics", e);
        }
    }

    public void createTopics(String ... topicNames) {
        List newTopics = Arrays.stream(topicNames).map(topicName -> new NewTopic(topicName, 1, 1)).collect(Collectors.toList());
        try {
            this.createAdminClient().createTopics(newTopics).all().get();
        }
        catch (Exception e) {
            throw new RuntimeException("Error while creating topics", e);
        }
    }

    public void deleteAllConsumerGroups() {
        AdminClient adminClient = this.createAdminClient();
        try {
            adminClient.listConsumerGroups().all().thenApply(consumerGroupListings -> {
                List groupIds = consumerGroupListings.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList());
                return adminClient.deleteConsumerGroups(groupIds);
            }).get();
        }
        catch (Exception e) {
            throw new RuntimeException("Error while deleting consumer groups", e);
        }
    }

    public void registerSchemaRegistryTypes(Schema schema) {
        try {
            this.schemaRegistryClient().register(schema.getFullName(), schema);
        }
        catch (Exception e) {
            throw new RuntimeException("Error while registering schemas", e);
        }
    }

    public AdminClient createAdminClient() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.kafkaBootstrapServers);
        return KafkaAdminClient.create((Properties)properties);
    }

    public CachedSchemaRegistryClient schemaRegistryClient() {
        return new CachedSchemaRegistryClient(this.getSchemaRegistryUrl(), 1000);
    }

    public <K, V> KafkaProducer<K, V> createProducerWithAvroValue(Class<? extends Serializer<K>> keySerializerClass) {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.kafkaBootstrapServers);
        props.put("acks", "all");
        props.put("retries", (Object)0);
        props.put("key.serializer", keySerializerClass.getName());
        props.put("value.serializer", SpecificAvroSerializer.class.getName());
        props.put("schema.registry.url", this.schemaRegistryUrl);
        return new KafkaProducer(props);
    }

    public <K, V> KafkaConsumer<K, V> createConsumerWithAvroValue(Class<? extends Deserializer<K>> keyDeserializer, String consumerGroupIdPrefix) {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.kafkaBootstrapServers);
        props.put("group.id", consumerGroupIdPrefix + UUID.randomUUID());
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("allow.auto.create.topics", "false");
        props.put("schema.registry.url", this.schemaRegistryUrl);
        props.put("key.deserializer", keyDeserializer);
        props.put("value.deserializer", SpecificAvroDeserializer.class);
        return new KafkaConsumer(props);
    }

    public <K, V> void sendRecords(String topic, List<V> values, Class<? extends Serializer<K>> keySerializerClass, BiFunction<Integer, V, K> keyCreationFunction) {
        KafkaProducer<K, V> producer = this.createProducerWithAvroValue(keySerializerClass);
        int valuesSize = values.size();
        for (int i = 0; i < valuesSize; ++i) {
            V object = values.get(i);
            ProducerRecord record = new ProducerRecord(topic, keyCreationFunction.apply(i, object), object);
            producer.send(record);
        }
        producer.flush();
    }

    public <K, V> List<V> waitForRecords(String topicName, String groupIdPrefix, int maxWaitTimeInMs, int expectedItems, Class<? extends Deserializer<K>> keyDeserializer) {
        Instant startTime = Instant.now();
        KafkaConsumer<K, V> recoConsumer = this.createConsumerWithAvroValue(keyDeserializer, groupIdPrefix);
        recoConsumer.subscribe(Collections.singletonList(topicName));
        ArrayList receivedStoryRecommendations = new ArrayList();
        do {
            ConsumerRecords consumedRecords = recoConsumer.poll(Duration.ofMillis(1000L));
            consumedRecords.records(topicName).forEach(record -> receivedStoryRecommendations.add(record.value()));
        } while (Instant.now().toEpochMilli() - startTime.toEpochMilli() < (long)maxWaitTimeInMs && receivedStoryRecommendations.size() < expectedItems);
        return receivedStoryRecommendations;
    }
}

