package org.apache.eventmesh.storage.kafka.admin;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.eventmesh.api.admin.AbstractAdmin;
import org.apache.eventmesh.api.admin.TopicProperties;
import org.apache.eventmesh.common.config.ConfigService;
import org.apache.eventmesh.storage.kafka.config.ClientConfiguration;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/storage/kafka/admin/KafkaAdmin.class */
public class KafkaAdmin extends AbstractAdmin {
    private static final Logger log = LoggerFactory.getLogger(KafkaAdmin.class);
    private static final Properties kafkaProps = new Properties();
    private static final Map<String, Integer> topicProps = new HashMap();

    public KafkaAdmin() {
        super(new AtomicBoolean(false));
        ClientConfiguration clientConfiguration = (ClientConfiguration) ConfigService.getInstance().buildConfigInstance(ClientConfiguration.class);
        kafkaProps.put("bootstrap.servers", clientConfiguration.getNamesrvAddr());
        topicProps.put("partitionNum", Integer.valueOf(clientConfiguration.getPartitions()));
        topicProps.put("replicationFactorNum", Integer.valueOf(clientConfiguration.getReplicationFactors()));
    }

    public List<TopicProperties> getTopic() {
        try {
            Admin create = Admin.create(kafkaProps);
            Throwable th = null;
            try {
                Map values = create.describeTopics((Set) create.listTopics().names().get(10L, TimeUnit.SECONDS)).values();
                ArrayList arrayList = new ArrayList();
                for (Map.Entry entry : values.entrySet()) {
                    String str = (String) entry.getKey();
                    arrayList.add(new TopicProperties(str, ((TopicDescription) ((KafkaFuture) entry.getValue()).get(10L, TimeUnit.SECONDS)).partitions().stream().mapToInt((v0) -> {
                        return v0.partition();
                    }).mapToLong(i -> {
                        try {
                            return getMsgCount(str, i, create);
                        } catch (InterruptedException | ExecutionException e) {
                            log.error("Failed to get msg offset when listing topics.", e);
                            throw new RuntimeException(e);
                        } catch (TimeoutException e2) {
                            log.error("Failed to get msg offset when listing topics. Kafka response timed out in {} seconds.", 10);
                            throw new RuntimeException(e2);
                        }
                    }).sum()));
                }
                arrayList.sort(Comparator.comparing(topicProperties -> {
                    return topicProperties.name;
                }));
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                return arrayList;
            } catch (Throwable th3) {
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th3;
            }
        } catch (TimeoutException e) {
            log.error("Failed to list topics. Kafka response timed out in {} seconds.", 10);
            throw new RuntimeException(e);
        } catch (Exception e2) {
            log.error("Failed to list topics.", e2);
            throw new RuntimeException(e2);
        }
    }

    private long getMsgCount(String str, int i, Admin admin) throws ExecutionException, InterruptedException, TimeoutException {
        TopicPartition topicPartition = new TopicPartition(str, i);
        return getOffset(topicPartition, OffsetSpec.latest(), admin) - getOffset(topicPartition, OffsetSpec.earliest(), admin);
    }

    private long getOffset(TopicPartition topicPartition, OffsetSpec offsetSpec, Admin admin) throws ExecutionException, InterruptedException, TimeoutException {
        return ((ListOffsetsResult.ListOffsetsResultInfo) ((Map) admin.listOffsets(Collections.singletonMap(topicPartition, offsetSpec)).all().get(10L, TimeUnit.SECONDS)).get(topicPartition)).offset();
    }

    public void createTopic(String str) {
        try {
            Admin create = Admin.create(kafkaProps);
            Throwable th = null;
            try {
                try {
                    create.createTopics(Collections.singletonList(new NewTopic(str, topicProps.get("partitionNum").intValue(), topicProps.get("replicationFactorNum").shortValue()))).all().get(10L, TimeUnit.SECONDS);
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (TimeoutException e) {
            log.error("Failed to create topic. Kafka response timed out in {} seconds.", 10);
        } catch (Exception e2) {
            log.error("Failed to create topic.", e2);
        }
    }

    public void deleteTopic(String str) {
        try {
            Admin create = Admin.create(kafkaProps);
            Throwable th = null;
            try {
                try {
                    create.deleteTopics(Collections.singletonList(str)).all().get(10L, TimeUnit.SECONDS);
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (TimeoutException e) {
            log.error("Failed to delete topic. Kafka response timed out in {} seconds.", 10);
        } catch (Exception e2) {
            log.error("Failed to delete topic.", e2);
        }
    }
}
