package org.apache.inlong.sdk.sort.impl.kafka;

import com.google.gson.Gson;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.sdk.sort.api.ClientContext;
import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
import org.apache.inlong.sdk.sort.api.SeekerFactory;
import org.apache.inlong.sdk.sort.api.SortClientConfig;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
import org.apache.inlong.sdk.sort.entity.MessageRecord;
import org.apache.inlong.sdk.sort.fetcher.kafka.AckOffsetOnRebalance;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    */
@Deprecated
/* loaded from: input_file:org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.class */
public class InLongKafkaFetcherImpl extends InLongTopicFetcher {
    private final Logger logger;
    private final ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap;
    private final AtomicLong ackOffsets;
    private volatile boolean stopConsume;
    private String bootstrapServers;
    private KafkaConsumer<byte[], byte[]> consumer;

    /* loaded from: input_file:org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl$Fetcher.class */
    public class Fetcher implements Runnable {
        public Fetcher() {
        }

        private void commitKafkaOffset() {
            if (InLongKafkaFetcherImpl.this.consumer == null || InLongKafkaFetcherImpl.this.commitOffsetMap.size() <= 0) {
                return;
            }
            try {
                InLongKafkaFetcherImpl.this.consumer.commitSync(InLongKafkaFetcherImpl.this.commitOffsetMap);
                InLongKafkaFetcherImpl.this.commitOffsetMap.clear();
            } catch (Exception e) {
                InLongKafkaFetcherImpl.this.logger.error(e.getMessage(), e);
            }
        }

        private void handleAndCallbackMsg(List<MessageRecord> list) {
            long currentTimeMillis = System.currentTimeMillis();
            try {
                InLongKafkaFetcherImpl.this.context.getStatManager().getStatistics(InLongKafkaFetcherImpl.this.context.getConfig().getSortTaskId(), InLongKafkaFetcherImpl.this.inLongTopic.getInLongCluster().getClusterId(), InLongKafkaFetcherImpl.this.inLongTopic.getTopic()).addCallbackTimes(1L);
                InLongKafkaFetcherImpl.this.context.getConfig().getCallback().onFinishedBatch(list);
                InLongKafkaFetcherImpl.this.context.getStatManager().getStatistics(InLongKafkaFetcherImpl.this.context.getConfig().getSortTaskId(), InLongKafkaFetcherImpl.this.inLongTopic.getInLongCluster().getClusterId(), InLongKafkaFetcherImpl.this.inLongTopic.getTopic()).addCallbackTimeCost(System.currentTimeMillis() - currentTimeMillis).addCallbackDoneTimes(1L);
            } catch (Exception e) {
                InLongKafkaFetcherImpl.this.context.getStatManager().getStatistics(InLongKafkaFetcherImpl.this.context.getConfig().getSortTaskId(), InLongKafkaFetcherImpl.this.inLongTopic.getInLongCluster().getClusterId(), InLongKafkaFetcherImpl.this.inLongTopic.getTopic()).addCallbackErrorTimes(1L);
                InLongKafkaFetcherImpl.this.logger.error(e.getMessage(), e);
            }
        }

        private String getOffset(int i, long j) {
            return i + ":" + j;
        }

        private Map<String, String> getMsgHeaders(Headers headers) {
            HashMap hashMap = new HashMap();
            Iterator it = headers.iterator();
            while (it.hasNext()) {
                Header header = (Header) it.next();
                hashMap.put(header.key(), new String(header.value()));
            }
            return hashMap;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    try {
                        if (InLongKafkaFetcherImpl.this.context.getConfig().isStopConsume() || InLongKafkaFetcherImpl.this.stopConsume) {
                            TimeUnit.MILLISECONDS.sleep(50L);
                            if (0 != 0) {
                                InLongKafkaFetcherImpl.this.context.releaseRequestPermit();
                            }
                        } else {
                            if (InLongKafkaFetcherImpl.this.sleepTime > 0) {
                                TimeUnit.MILLISECONDS.sleep(InLongKafkaFetcherImpl.this.sleepTime);
                            }
                            InLongKafkaFetcherImpl.this.context.acquireRequestPermit();
                            fetchFromKafka();
                            commitKafkaOffset();
                            if (1 != 0) {
                                InLongKafkaFetcherImpl.this.context.releaseRequestPermit();
                            }
                        }
                    } catch (Exception e) {
                        InLongKafkaFetcherImpl.this.context.getStatManager().getStatistics(InLongKafkaFetcherImpl.this.context.getConfig().getSortTaskId(), InLongKafkaFetcherImpl.this.inLongTopic.getInLongCluster().getClusterId(), InLongKafkaFetcherImpl.this.inLongTopic.getTopic()).addFetchErrorTimes(1L);
                        InLongKafkaFetcherImpl.this.logger.error(e.getMessage(), e);
                        if (0 != 0) {
                            InLongKafkaFetcherImpl.this.context.releaseRequestPermit();
                        }
                    }
                } catch (Throwable th) {
                    if (0 != 0) {
                        InLongKafkaFetcherImpl.this.context.releaseRequestPermit();
                    }
                    throw th;
                }
            }
        }

        /*  JADX ERROR: JadxRuntimeException in pass: InlineMethods
            jadx.core.utils.exceptions.JadxRuntimeException: Failed to process method for inline: org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl.access$4802(org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl, long):long
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:74)
            	at jadx.core.dex.visitors.InlineMethods.visit(InlineMethods.java:49)
            Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Class not yet loaded at codegen stage: org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl
            	at jadx.core.dex.nodes.ClassNode.reloadAtCodegenStage(ClassNode.java:883)
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:66)
            	... 1 more
            */
        private void fetchFromKafka() throws java.lang.Exception {
            /*
                Method dump skipped, instructions count: 622
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl.Fetcher.fetchFromKafka():void");
        }
    }

    public InLongKafkaFetcherImpl(InLongTopic inLongTopic, ClientContext clientContext) {
        super(inLongTopic, clientContext);
        this.logger = LoggerFactory.getLogger(InLongKafkaFetcherImpl.class);
        this.commitOffsetMap = new ConcurrentHashMap<>();
        this.ackOffsets = new AtomicLong(0L);
        this.stopConsume = false;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public boolean init(Object obj) {
        String str = (String) obj;
        try {
            createKafkaConsumer(str);
            if (this.consumer == null) {
                this.logger.info("consumer is null");
                return false;
            }
            this.logger.info("start to subscribe topic:{}", new Gson().toJson(this.inLongTopic));
            this.seeker = SeekerFactory.createKafkaSeeker(this.consumer, this.inLongTopic);
            this.consumer.subscribe(Collections.singletonList(this.inLongTopic.getTopic()), new AckOffsetOnRebalance(this.inLongTopic.getInLongCluster().getClusterId(), this.seeker, this.commitOffsetMap));
            this.bootstrapServers = str;
            String format = String.format("sort_sdk_fetch_thread_%s_%s_%d", this.inLongTopic.getInLongCluster().getClusterId(), this.inLongTopic.getTopic(), Integer.valueOf(hashCode()));
            this.fetchThread = new Thread(new Fetcher(), format);
            this.logger.info("start to start thread:{}", format);
            this.fetchThread.start();
            return true;
        } catch (Exception e) {
            this.logger.error(e.getMessage(), e);
            return false;
        }
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public void ack(String str) throws Exception {
        String[] split = str.split(":");
        if (split.length != 2) {
            throw new Exception("offset is illegal, the correct format is int:long ,the error offset is:" + str);
        }
        this.commitOffsetMap.put(new TopicPartition(this.inLongTopic.getTopic(), Integer.parseInt(split[0])), new OffsetAndMetadata(Long.parseLong(split[1])));
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public void pause() {
        this.stopConsume = true;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public void resume() {
        this.stopConsume = false;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public boolean close() {
        this.closed = true;
        try {
            if (this.fetchThread != null) {
                this.fetchThread.interrupt();
            }
            if (this.consumer != null) {
                this.consumer.close();
            }
        } catch (Throwable th) {
            th.printStackTrace();
        }
        this.logger.info("closed {}", this.inLongTopic);
        return true;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public boolean isClosed() {
        return this.closed;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public void stopConsume(boolean z) {
        this.stopConsume = z;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public boolean isConsumeStop() {
        return this.stopConsume;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public InLongTopic getInLongTopic() {
        return this.inLongTopic;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public long getConsumedDataSize() {
        return 0L;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicFetcher
    public long getAckedOffset() {
        return 0L;
    }

    private void createKafkaConsumer(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("group.id", this.context.getConfig().getSortTaskId());
        properties.put("key.deserializer", ByteArrayDeserializer.class.getName());
        properties.put("value.deserializer", ByteArrayDeserializer.class.getName());
        properties.put("receive.buffer.bytes", Integer.valueOf(this.context.getConfig().getKafkaSocketRecvBufferSize()));
        properties.put("enable.auto.commit", false);
        SortClientConfig.ConsumeStrategy offsetResetStrategy = this.context.getConfig().getOffsetResetStrategy();
        if (offsetResetStrategy == SortClientConfig.ConsumeStrategy.lastest || offsetResetStrategy == SortClientConfig.ConsumeStrategy.lastest_absolutely) {
            properties.put("auto.offset.reset", "latest");
        } else if (offsetResetStrategy == SortClientConfig.ConsumeStrategy.earliest || offsetResetStrategy == SortClientConfig.ConsumeStrategy.earliest_absolutely) {
            properties.put("auto.offset.reset", "earliest");
        } else {
            properties.put("auto.offset.reset", "none");
        }
        properties.put("fetch.max.bytes", Integer.valueOf(this.context.getConfig().getKafkaFetchSizeBytes()));
        properties.put("fetch.max.wait.ms", Integer.valueOf(this.context.getConfig().getKafkaFetchWaitMs()));
        properties.put("enable.auto.commit", false);
        properties.put("partition.assignment.strategy", RangeAssignor.class.getName());
        properties.put("connections.max.idle.ms", 120000L);
        this.bootstrapServers = str;
        this.logger.info("start to create kafka consumer:{}", properties);
        this.consumer = new KafkaConsumer<>(properties);
        this.logger.info("end to create kafka consumer:{}", this.consumer);
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl.access$4802(org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$4802(org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.sleepTime = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl.access$4802(org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl, long):long");
    }

    static /* synthetic */ ClientContext access$4900(InLongKafkaFetcherImpl inLongKafkaFetcherImpl) {
        return inLongKafkaFetcherImpl.context;
    }

    static /* synthetic */ InLongTopic access$5000(InLongKafkaFetcherImpl inLongKafkaFetcherImpl) {
        return inLongKafkaFetcherImpl.inLongTopic;
    }

    static /* synthetic */ InLongTopic access$5100(InLongKafkaFetcherImpl inLongKafkaFetcherImpl) {
        return inLongKafkaFetcherImpl.inLongTopic;
    }

    static /* synthetic */ ClientContext access$5200(InLongKafkaFetcherImpl inLongKafkaFetcherImpl) {
        return inLongKafkaFetcherImpl.context;
    }

    static /* synthetic */ int access$5308(InLongKafkaFetcherImpl inLongKafkaFetcherImpl) {
        int i = inLongKafkaFetcherImpl.emptyFetchTimes;
        inLongKafkaFetcherImpl.emptyFetchTimes = i + 1;
        return i;
    }

    static /* synthetic */ int access$5400(InLongKafkaFetcherImpl inLongKafkaFetcherImpl) {
        return inLongKafkaFetcherImpl.emptyFetchTimes;
    }

    static /* synthetic */ ClientContext access$5500(InLongKafkaFetcherImpl inLongKafkaFetcherImpl) {
        return inLongKafkaFetcherImpl.context;
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl.access$5602(org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$5602(org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.sleepTime = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl.access$5602(org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl, long):long");
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl.access$5802(org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$5802(org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.sleepTime = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl.access$5802(org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl, long):long");
    }

    static /* synthetic */ long access$5900(InLongKafkaFetcherImpl inLongKafkaFetcherImpl) {
        return inLongKafkaFetcherImpl.sleepTime;
    }

    static /* synthetic */ ClientContext access$6000(InLongKafkaFetcherImpl inLongKafkaFetcherImpl) {
        return inLongKafkaFetcherImpl.context;
    }

    static /* synthetic */ ClientContext access$6100(InLongKafkaFetcherImpl inLongKafkaFetcherImpl) {
        return inLongKafkaFetcherImpl.context;
    }

    static /* synthetic */ int access$6202(InLongKafkaFetcherImpl inLongKafkaFetcherImpl, int i) {
        inLongKafkaFetcherImpl.emptyFetchTimes = i;
        return i;
    }
}
