package io.kipe.streams.kafka.micronaut.factories;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/kipe/streams/kafka/micronaut/factories/TopicManager.class */
public class TopicManager {
    static final Logger LOG = LoggerFactory.getLogger(TopicManager.class);
    static final RecordsToDelete DELETE_ALL_RECORDS = RecordsToDelete.beforeOffset(-1);

    @Inject
    private AdminClient adminClient;

    public void ensureTopics(NewTopic... newTopicArr) throws InterruptedException, ExecutionException {
        ensureTopics(new HashSet(Arrays.asList(newTopicArr)));
    }

    public void ensureTopics(Set<NewTopic> set) throws InterruptedException, ExecutionException {
        if (this.adminClient == null) {
            set.forEach(newTopic -> {
                LOG.warn("adminClient not injected, skipping creation of topic '{}'", newTopic);
            });
            return;
        }
        HashSet hashSet = new HashSet();
        Set set2 = (Set) this.adminClient.listTopics().names().get();
        set.forEach(newTopic2 -> {
            String name = newTopic2.name();
            if (set2.contains(name)) {
                LOG.info("topic '{}' is available", name);
            } else {
                hashSet.add(newTopic2);
            }
        });
        this.adminClient.createTopics(hashSet).all().get();
        hashSet.forEach(newTopic3 -> {
            LOG.info("topic '{}' created", newTopic3.name());
        });
    }

    public void deleteTopics(Collection<String> collection) throws InterruptedException, ExecutionException {
        if (this.adminClient == null) {
            collection.forEach(str -> {
                LOG.warn("adminClient not injected, skipping deletion of topic '{}'", str);
            });
        } else {
            this.adminClient.deleteTopics(collection).all().get();
        }
    }

    public void clearTopics(Collection<String> collection) throws InterruptedException, ExecutionException {
        if (this.adminClient == null) {
            collection.forEach(str -> {
                LOG.warn("adminClient not injected, skipping cleaning of topic '{}'", str);
            });
            return;
        }
        Collection values = ((Map) this.adminClient.describeTopics(collection).all().get()).values();
        HashMap hashMap = new HashMap();
        values.forEach(topicDescription -> {
            String name = topicDescription.name();
            topicDescription.partitions().forEach(topicPartitionInfo -> {
                hashMap.put(new TopicPartition(name, topicPartitionInfo.partition()), DELETE_ALL_RECORDS);
            });
        });
        this.adminClient.deleteRecords(hashMap).all().get();
        values.forEach(topicDescription2 -> {
            LOG.info("topic '{}' removed all messages", topicDescription2.name());
        });
    }
}
