/*
 * Decompiled with CFR 0.152.
 */
package cn.benma666.sjzt.kafka;

import cn.benma666.domain.SysSjglSjzt;
import cn.benma666.domain.SysSjglZnjh;
import cn.benma666.exception.MyException;
import cn.benma666.iframe.InterfaceLog;
import cn.benma666.iframe.Result;
import cn.benma666.myutils.ClassUtil;
import cn.benma666.myutils.DateUtil;
import cn.benma666.myutils.FileUtil;
import cn.benma666.sjzt.BasicSjzt;
import cn.benma666.sjzt.IFile;
import cn.benma666.sjzt.SjztBzcwjdxExecption;
import cn.benma666.sjzt.SjztExecRunnable;
import cn.benma666.sjzt.SjztPooledObjectFactory;
import cn.benma666.sjzt.bdwj.BdwjFile;
import cn.benma666.sjzt.kafka.KafkaClient;
import cn.benma666.sjzt.kafka.KafkaFile;
import com.alibaba.druid.util.Utils;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateAclsResult;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;

public class Kafka
extends BasicSjzt {
    private static Kafka kafka = null;
    private final GenericObjectPool objectPool;

    protected Kafka(String name, SysSjglSjzt sjzt) {
        super(name, sjzt);
        GenericObjectPoolConfig conf = new GenericObjectPoolConfig();
        conf.setTestOnCreate(true);
        conf.setTestOnBorrow(true);
        conf.setMinIdle(1);
        conf.setMaxIdle(2);
        conf.setMaxTotal(5);
        conf.setMaxWait(Duration.ofSeconds(300L));
        ClassUtil.plMethodInvoke(conf, this.getSjzt().getKzxxObj().getJSONObject("ljcpz"));
        this.objectPool = new GenericObjectPool((PooledObjectFactory)new SjztPooledObjectFactory(sjzt), conf);
        KafkaClient kafkaClient = null;
        try {
            kafkaClient = this.borrowClient();
        }
        catch (Exception e) {
            throw new MyException(name + "\u521d\u59cb\u5316\u5931\u8d25", e);
        }
        finally {
            this.returnClient(kafkaClient);
        }
        if (kafka == null) {
            kafka = this;
        }
        cache.put(name, (Object)this);
    }

    public void returnClient(KafkaClient client) {
        this.log.trace("\u91ca\u653e\u56de\u8fde\u63a5\u6c60\uff1a{}", (Object)this.name);
        if (client != null) {
            this.objectPool.returnObject((Object)client);
        }
    }

    public KafkaClient borrowClient() throws Exception {
        this.log.trace("\u4ece\u8fde\u63a5\u6c60\u83b7\u53d6\uff1a{}", (Object)this.name);
        return (KafkaClient)this.objectPool.borrowObject();
    }

    public static Kafka use(String name) {
        return Kafka.use(name, Kafka.getSjzt(name));
    }

    public static Kafka use(String name, SysSjglSjzt sjzt) {
        Kafka kafka = (Kafka)cache.get((Object)name);
        if (kafka == null) {
            kafka = new Kafka(name, sjzt);
        }
        return kafka;
    }

    public static Result cszt(SysSjglSjzt sjzt) {
        try {
            KafkaClient fc = new KafkaClient(sjzt);
            fc.getAdminClient().listTopics();
            fc.close();
            return Kafka.success("\u6d4b\u8bd5\u6210\u529f");
        }
        catch (Throwable t) {
            slog.debug("{}\u6d4b\u8bd5\u5931\u8d25", (Object)sjzt, (Object)t);
            return Kafka.failed("\u8f7d\u4f53\u6d4b\u8bd5\u4e0d\u901a\u8fc7\uff1a" + t.getMessage());
        }
    }

    public static boolean validateClient(SysSjglSjzt sjzt, Object client) {
        try {
            KafkaClient adminClient = (KafkaClient)client;
            ListTopicsOptions options = new ListTopicsOptions();
            options.timeoutMs(Integer.valueOf(5000));
            adminClient.getAdminClient().listTopics(options);
            return true;
        }
        catch (Throwable t) {
            slog.debug("ftp\u9a8c\u8bc1\u65e0\u6548\uff1a{}", (Object)sjzt.getMc(), (Object)t);
            return false;
        }
    }

    public static void destroyClient(SysSjglSjzt sjzt, Object client) throws Exception {
        KafkaClient kafkaClient = (KafkaClient)client;
        kafkaClient.close();
    }

    public static KafkaClient createClient(SysSjglSjzt sjzt) {
        return new KafkaClient(sjzt);
    }

    public Object exec(SjztExecRunnable<KafkaClient> exec) {
        KafkaClient kafkaClient = null;
        try {
            kafkaClient = this.borrowClient();
            Object object = exec.exec(kafkaClient);
            return object;
        }
        catch (Exception e) {
            throw new MyException("kafka\u6267\u884c\u5f02\u5e38", e);
        }
        finally {
            if (kafkaClient != null) {
                this.objectPool.returnObject((Object)kafkaClient);
            }
        }
    }

    @Override
    public List<IFile> listFiles(SysSjglZnjh znjhConfig) {
        return (List)this.exec(kafkaClient -> {
            try {
                KafkaConsumer consumer = kafkaClient.getKafkaConsumer();
                ArrayList<KafkaFile> list = new ArrayList<KafkaFile>();
                consumer.subscribe(Collections.singleton(znjhConfig.getSrml().replace("/", "")));
                for (int s = 0; list.size() == 0 && s <= 10; ++s) {
                    ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
                    for (ConsumerRecord record : records) {
                        KafkaFile file = new KafkaFile("/", (ConsumerRecord<String, String>)record, this);
                        file.setGzml(znjhConfig.getSrml().replace("/", ""));
                        list.add(file);
                        this.log.debug("offset = {}, value = {}", (Object)record.offset(), record.value());
                    }
                }
                consumer.commitAsync();
                consumer.unsubscribe();
                return list;
            }
            catch (Exception e) {
                throw new MyException("\u83b7\u53d6kafka\u6587\u4ef6\u5217\u8868\u5931\u8d25", e);
            }
        });
    }

    @Override
    public InputStream getInputStream(IFile file) {
        if (file instanceof KafkaFile) {
            return new ByteArrayInputStream(((String)((KafkaFile)file).getFile().value()).getBytes());
        }
        throw new MyException("\u4e0d\u652f\u6301\u975ekafkafile");
    }

    @Override
    public boolean delete(IFile file) {
        if (file instanceof KafkaFile) {
            return true;
        }
        throw new MyException("\u4e0d\u652f\u6301\u975ekafkafile");
    }

    public boolean pub(String topic, String msg) throws Exception {
        return this.save(new ByteArrayInputStream(msg.getBytes(StandardCharsets.UTF_8)), new BdwjFile(topic, null));
    }

    @Override
    public boolean save(InputStream is, IFile file) {
        return (Boolean)this.exec(kafkaClient -> {
            ProducerRecord record = null;
            try {
                AtomicInteger sfcg = new AtomicInteger(0);
                ProducerRecord finalRecord = record = new ProducerRecord(file.getParent().replace("/", ""), (Object)Utils.read((InputStream)is));
                kafkaClient.getKafkaProducer().send(record, (recordMetadata, e) -> {
                    if (e != null) {
                        sfcg.set(1);
                        this.log.error("\u6d88\u606f\u63a8\u9001\u5f02\u5e38\uff1a" + finalRecord.topic() + ">" + (String)finalRecord.value(), (Throwable)e);
                    } else {
                        sfcg.set(2);
                    }
                });
                while (sfcg.get() == 0) {
                    Thread.sleep(100L);
                }
                Boolean bl = sfcg.get() == 2;
                return bl;
            }
            catch (Throwable t) {
                if (record == null) {
                    throw new MyException("\u6d88\u606f\u63a8\u9001\u5f02\u5e38", t);
                }
                throw new MyException("\u6d88\u606f\u63a8\u9001\u5f02\u5e38\uff1a" + record.topic() + ">" + (String)record.value(), t);
            }
            finally {
                FileUtil.closeStream(is);
            }
        });
    }

    @Override
    public String getRootPath() {
        return this.sjzt.getDxgs();
    }

    @Override
    public long getSize(IFile file) {
        if (file instanceof KafkaFile) {
            return ((String)((KafkaFile)file).getFile().value()).getBytes().length;
        }
        throw new SjztBzcwjdxExecption("\u4e0d\u652f\u6301\u975ekafkafile");
    }

    @Override
    public void close() {
        if (!this.objectPool.isClosed()) {
            this.objectPool.close();
        }
        cache.remove((Object)this.name);
        if (this == kafka) {
            kafka = null;
        }
    }

    @Override
    public void sjztjt(SysSjglZnjh znjhConfig, InterfaceLog log) {
        this.exec(clent -> {
            long jtjg = DateUtil.scSjStrToLong(znjhConfig.getJtjg());
            String gml = znjhConfig.getSrml();
            KafkaConsumer consumer = clent.getKafkaConsumer();
            consumer.subscribe(Collections.singleton(gml.replace("/", "")));
            block0: while (true) {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(jtjg));
                Iterator iterator = records.iterator();
                while (true) {
                    if (!iterator.hasNext()) continue block0;
                    ConsumerRecord record = (ConsumerRecord)iterator.next();
                    KafkaFile file = new KafkaFile("/", (ConsumerRecord<String, String>)record, this);
                    file.setGzml(gml);
                    znjhConfig.getTp().run(() -> this.znjh.znjh(znjhConfig, log, file));
                }
                break;
            }
        });
    }

    public Collection<String> getAllAcl() {
        return (Collection)this.exec(kafkaClient -> {
            AdminClient adminClient = kafkaClient.getAdminClient();
            return adminClient.describeAcls(AclBindingFilter.ANY).values().get();
        });
    }

    public Set<String> getTopics() {
        return (Set)this.exec(kafkaClient -> {
            AdminClient adminClient = kafkaClient.getAdminClient();
            return adminClient.listTopics().names().get();
        });
    }

    public Collection<String> getConsumerGroups() {
        return (Collection)this.exec(kafkaClient -> {
            AdminClient adminClient = kafkaClient.getAdminClient();
            return adminClient.listConsumerGroups().valid().get();
        });
    }

    public void addTopicReadOrWriterAcl(String topic, String username) {
        this.exec(kafkaClient -> {
            AdminClient adminClient = kafkaClient.getAdminClient();
            ResourcePattern rpn = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL);
            AccessControlEntry aceyWrite = new AccessControlEntry("User:" + username, "*", AclOperation.WRITE, AclPermissionType.ALLOW);
            AccessControlEntry aceyRead = new AccessControlEntry("User:" + username, "*", AclOperation.READ, AclPermissionType.ALLOW);
            AclBinding aclBindingWrite = new AclBinding(rpn, aceyWrite);
            AclBinding aclBindingRead = new AclBinding(rpn, aceyRead);
            ArrayList<AclBinding> list = new ArrayList<AclBinding>();
            list.add(aclBindingWrite);
            list.add(aclBindingRead);
            CreateAclsResult acls = adminClient.createAcls(list);
            acls.all().get();
            return null;
        });
    }

    public void addGroupReadAcl(String consumerGroup, String username) {
        this.exec(kafkaClient -> {
            AdminClient adminClient = kafkaClient.getAdminClient();
            ResourcePattern rpn = new ResourcePattern(ResourceType.GROUP, consumerGroup, PatternType.LITERAL);
            AccessControlEntry acey = new AccessControlEntry("User:" + username, "*", AclOperation.READ, AclPermissionType.ALLOW);
            AclBinding aclBinding = new AclBinding(rpn, acey);
            CreateAclsResult acls = adminClient.createAcls(Collections.singletonList(aclBinding));
            acls.all().get();
            return null;
        });
    }

    public void creatTopic(String topicName) {
        this.exec(kafkaClient -> {
            AdminClient adminClient = kafkaClient.getAdminClient();
            Set topics = (Set)adminClient.listTopics().names().get();
            if (topics.contains(topicName)) {
                return null;
            }
            NewTopic newTopic = new NewTopic(topicName, 1, 1);
            CreateTopicsResult topic = adminClient.createTopics(Collections.singletonList(newTopic));
            topic.all().get();
            return null;
        });
    }

    public void deleteTopic(String topicName) {
        this.exec(kafkaClient -> {
            AdminClient adminClient = kafkaClient.getAdminClient();
            DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList(topicName));
            for (Map.Entry entry : deleteTopicsResult.values().entrySet()) {
                KafkaFuture future = (KafkaFuture)entry.getValue();
                future.get();
            }
            return null;
        });
    }
}

