package org.apache.inlong.audit.service.consume;

import com.google.common.base.Preconditions;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.audit.config.MessageQueueConfig;
import org.apache.inlong.audit.config.StoreConfig;
import org.apache.inlong.audit.service.InsertData;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/org/apache/inlong/audit/service/consume/KafkaConsume.class */
public class KafkaConsume extends BaseConsume {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsume.class);
    private KafkaConsumer<String, byte[]> consumer;
    private String serverUrl;
    private String topic;
    private static final int DEFAULT_NUM_PARTITIONS = 3;
    private static final int DEFAULT_REPLICATION_FACTOR = 2;

    /* loaded from: input_file:BOOT-INF/classes/org/apache/inlong/audit/service/consume/KafkaConsume$Fetcher.class */
    public class Fetcher implements Runnable {
        private final KafkaConsumer<String, byte[]> consumer;
        private final String topic;
        private final boolean isAutoCommit;
        private final long fetchWaitMs;

        public Fetcher(KafkaConsumer<String, byte[]> kafkaConsumer, String str, boolean z, long j) {
            this.consumer = kafkaConsumer;
            this.topic = str;
            this.isAutoCommit = z;
            this.fetchWaitMs = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(this.fetchWaitMs));
                    if (poll != null && !poll.isEmpty()) {
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            if (StringUtils.equals(consumerRecord.topic(), this.topic)) {
                                KafkaConsume.this.handleMessage(new String((byte[]) consumerRecord.value(), StandardCharsets.UTF_8));
                            }
                        }
                        if (!this.isAutoCommit) {
                            this.consumer.commitAsync();
                        }
                    }
                } catch (Exception e) {
                    KafkaConsume.LOG.error("kafka consumer get message error {}", e.getMessage());
                }
            }
        }
    }

    public KafkaConsume(List<InsertData> list, StoreConfig storeConfig, MessageQueueConfig messageQueueConfig) {
        super(list, storeConfig, messageQueueConfig);
    }

    @Override // org.apache.inlong.audit.service.consume.BaseConsume
    public void start() {
        this.serverUrl = this.mqConfig.getKafkaServerUrl();
        this.topic = this.mqConfig.getKafkaTopic();
        boolean z = Boolean.getBoolean(this.mqConfig.getEnableAutoCommit());
        Preconditions.checkArgument(StringUtils.isNotEmpty(this.serverUrl), "no kafka server url specified");
        Preconditions.checkArgument(StringUtils.isNotEmpty(this.mqConfig.getKafkaTopic()), "no kafka topic topic specified");
        Preconditions.checkArgument(StringUtils.isNotEmpty(this.mqConfig.getKafkaConsumerName()), "no kafka consume name specified");
        createTopic();
        initConsumer(this.mqConfig);
        new Thread(new Fetcher(this.consumer, this.topic, z, this.mqConfig.getFetchWaitMs()), "KafkaConsume_Fetcher_Thread").start();
    }

    /* JADX WARN: Failed to calculate best type for var: r12v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r12v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r13v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r13v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 12, insn: 0x012c: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r12 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:47:0x012c */
    /* JADX WARN: Not initialized variable reg: 13, insn: 0x0130: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r13 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:49:0x0130 */
    /* JADX WARN: Type inference failed for: r12v1, types: [org.apache.kafka.clients.admin.AdminClient] */
    /* JADX WARN: Type inference failed for: r13v0, types: [java.lang.Throwable] */
    private void createTopic() {
        int i = DEFAULT_NUM_PARTITIONS;
        if (StringUtils.isNotEmpty(this.mqConfig.getNumPartitions())) {
            i = Integer.parseInt(this.mqConfig.getNumPartitions());
        }
        int i2 = DEFAULT_REPLICATION_FACTOR;
        if (StringUtils.isNotEmpty(this.mqConfig.getReplicationFactor())) {
            i2 = Integer.parseInt(this.mqConfig.getReplicationFactor());
        }
        try {
            try {
                AdminClient create = AdminClient.create(getProperties(this.mqConfig));
                Throwable th = null;
                if (((Set) create.listTopics().names().get()).contains(this.topic)) {
                    LOG.info("The audit topic:{} already exists.", this.topic);
                    if (create != null) {
                        if (0 == 0) {
                            create.close();
                            return;
                        }
                        try {
                            create.close();
                            return;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                            return;
                        }
                    }
                    return;
                }
                Collection collection = (Collection) create.describeCluster().nodes().get();
                if (collection.isEmpty()) {
                    throw new IllegalArgumentException("kafka server not find");
                }
                create.createTopics(Collections.singletonList(new NewTopic(this.topic, Math.min(i, collection.size()), (short) Math.min(i2, collection.size())))).all().get();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        create.close();
                    }
                }
            } finally {
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(String.format("create audit topic:%s error with config:%s", this.topic, getProperties(this.mqConfig)), e);
        }
    }

    protected void initConsumer(MessageQueueConfig messageQueueConfig) {
        LOG.info("init kafka consumer, topic:{}, serverUrl:{}", this.topic, this.serverUrl);
        this.consumer = new KafkaConsumer<>(getProperties(messageQueueConfig));
        this.consumer.subscribe(Collections.singleton(this.topic));
    }

    private Properties getProperties(MessageQueueConfig messageQueueConfig) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.serverUrl);
        properties.put("group.id", messageQueueConfig.getKafkaGroupId());
        properties.put("enable.auto.commit", messageQueueConfig.getEnableAutoCommit());
        properties.put("auto.commit.interval.ms", messageQueueConfig.getAutoCommitIntervalMs());
        properties.put("auto.offset.reset", messageQueueConfig.getAutoOffsetReset());
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", ByteArrayDeserializer.class.getName());
        return properties;
    }
}
