/*
 * Decompiled with CFR 0.152.
 */
package io.castled.kafka.consumer;

import com.google.common.collect.ImmutableMap;
import io.castled.kafka.consumer.ConsumerState;
import io.castled.kafka.consumer.KafkaConsumerConfiguration;
import io.castled.kafka.consumer.KafkaRetriableException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseKafkaConsumer
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(BaseKafkaConsumer.class);
    private final KafkaConsumer<byte[], byte[]> kafkaConsumer;
    private final AtomicReference<ConsumerState> consumerState = new AtomicReference<ConsumerState>(ConsumerState.RUNNING);
    private final KafkaConsumerConfiguration kafkaConsumerConfiguration;

    public BaseKafkaConsumer(KafkaConsumerConfiguration consumerConfiguration) {
        HashMap<String, Object> consumerProps = new HashMap<String, Object>();
        consumerProps.put("group.id", consumerConfiguration.getConsumerGroup());
        consumerProps.put("bootstrap.servers", consumerConfiguration.getBootstrapServers());
        consumerProps.put("enable.auto.commit", false);
        consumerProps.put("key.deserializer", ByteArrayDeserializer.class.getCanonicalName());
        consumerProps.put("value.deserializer", ByteArrayDeserializer.class.getCanonicalName());
        consumerProps.put("request.timeout.ms", 60000);
        consumerProps.put("fetch.max.bytes", 0x800000);
        consumerProps.put("max.poll.records", 1000);
        this.kafkaConsumer = new KafkaConsumer(consumerProps);
        this.kafkaConsumerConfiguration = consumerConfiguration;
    }

    @Override
    public void run() {
        this.kafkaConsumer.subscribe(Collections.singleton(this.kafkaConsumerConfiguration.getTopic()));
        while (this.consumerState.get() == ConsumerState.RUNNING) {
            ConsumerRecords records = this.kafkaConsumer.poll(Duration.ofSeconds(10L));
            for (TopicPartition topicPartition : records.partitions()) {
                this.processPartition(topicPartition, records.records(topicPartition));
            }
        }
    }

    private void processPartition(TopicPartition topicPartition, List<ConsumerRecord<byte[], byte[]>> partitionRecords) {
        block6: {
            try {
                long lastProcessedOffset = this.processRecords(partitionRecords);
                if (lastProcessedOffset != -1L) {
                    this.kafkaConsumer.commitSync((Map)ImmutableMap.of((Object)topicPartition, (Object)new OffsetAndMetadata(lastProcessedOffset + 1L)));
                }
            }
            catch (KafkaRetriableException e) {
                if (e.getLastProcessedOffset() != -1L) {
                    this.kafkaConsumer.seek(topicPartition, e.getLastProcessedOffset() + 1L);
                } else {
                    this.kafkaConsumer.seek(topicPartition, partitionRecords.get(0).offset());
                }
            }
            catch (Exception e) {
                log.error("Failed to process records for topic {} and partition {}", new Object[]{topicPartition.topic(), topicPartition.partition(), e});
                if (!this.kafkaConsumerConfiguration.isRetryOnUnhandledFailures()) break block6;
                this.kafkaConsumer.seek(topicPartition, partitionRecords.get(0).offset());
            }
        }
    }

    public abstract long processRecords(List<ConsumerRecord<byte[], byte[]>> var1) throws Exception;

    public void stop() throws Exception {
        this.consumerState.set(ConsumerState.TERMINATED);
    }
}

