package com.mnt.sio.core.sin;

import com.mnt.base.util.BaseConfiguration;
import com.mnt.base.util.CommonUtil;
import com.mnt.sio.core.dtd.StreamData;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/* loaded from: input_file:com/mnt/sio/core/sin/KafkaSIn.class */
public class KafkaSIn implements SIn {
    protected ThreadLocal<KafkaConsumer<String, String>> consumerTL = new ThreadLocal<>();
    protected String topicName;
    protected String name;
    protected String desc;
    protected Map<String, Object> consumerProp;
    protected int threadCount;
    protected long timeWait;
    protected boolean sync;

    public KafkaSIn() {
    }

    public KafkaSIn(String str, Map<String, Object> map, int i, long j, boolean z) {
        init(str, map, i, j, z);
    }

    protected void init(String str, Map<String, Object> map, int i, long j, boolean z) {
        this.topicName = BaseConfiguration.getProperty(str, str);
        this.name = CommonUtil.concatWith(":", new Object[]{str, "kafka"});
        this.desc = CommonUtil.concatWith(":", new Object[]{"source", str, "kafka"});
        this.consumerProp = new HashMap(map);
        if (!this.consumerProp.containsKey("key.deserializer")) {
            this.consumerProp.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        }
        if (!this.consumerProp.containsKey("value.deserializer")) {
            this.consumerProp.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        }
        if (z) {
            this.consumerProp.put("enable.auto.commit", "false");
        } else {
            this.consumerProp.put("enable.auto.commit", "true");
        }
        this.threadCount = i;
        this.timeWait = j;
        this.sync = z;
    }

    @Override // com.mnt.sio.core.sin.SIn
    public List<StreamData> poll(Function<String, StreamData> function) throws InterruptedException {
        ConsumerRecords poll = this.consumerTL.get().poll(this.timeWait);
        ArrayList arrayList = new ArrayList();
        if (poll != null) {
            poll.forEach(consumerRecord -> {
                String str = (String) consumerRecord.value();
                if (CommonUtil.isEmpty(str)) {
                    return;
                }
                String str2 = (String) consumerRecord.key();
                long timestamp = consumerRecord.timestamp();
                StreamData streamData = (StreamData) function.apply(str);
                streamData.meta(str2, timestamp);
                arrayList.add(streamData);
            });
        }
        return arrayList;
    }

    @Override // com.mnt.sio.core.sin.SIn
    public int threadCount() {
        return this.threadCount;
    }

    @Override // com.mnt.sio.core.sin.SIn
    public void initialize() {
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(this.consumerProp);
        kafkaConsumer.subscribe(Collections.singletonList(this.topicName));
        this.consumerTL.set(kafkaConsumer);
    }

    @Override // com.mnt.sio.core.sin.SIn
    public void commit() {
        if (this.sync) {
            this.consumerTL.get().commitAsync();
        }
    }

    @Override // com.mnt.sio.core.sin.SIn
    public void destory() {
        this.consumerTL.get().close();
    }

    @Override // com.mnt.sio.core.sin.SIn
    public String name() {
        return this.name;
    }

    @Override // com.mnt.sio.core.sin.SIn
    public String desc() {
        return this.desc;
    }

    @Override // com.mnt.sio.core.sin.SIn
    public boolean sync() {
        return this.sync;
    }
}
