package kafka.examples;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.ProducerFencedException;

/* loaded from: input_file:kafka/examples/ExactlyOnceMessageProcessor.class */
public class ExactlyOnceMessageProcessor extends Thread {
    private static final boolean READ_COMMITTED = true;
    private final String inputTopic;
    private final String outputTopic;
    private final String transactionalId;
    private final String groupInstanceId;
    private final KafkaProducer<Integer, String> producer;
    private final KafkaConsumer<Integer, String> consumer;
    private final CountDownLatch latch;

    public ExactlyOnceMessageProcessor(String str, String str2, int i, CountDownLatch countDownLatch) {
        this.inputTopic = str;
        this.outputTopic = str2;
        this.transactionalId = "Processor-" + i;
        this.producer = new Producer(str2, true, this.transactionalId, true, -1, 10000, null).get();
        this.groupInstanceId = "Txn-consumer-" + i;
        this.consumer = new Consumer(str, "Eos-consumer", Optional.of(this.groupInstanceId), true, -1, null).get();
        this.latch = countDownLatch;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.producer.initTransactions();
        final AtomicLong atomicLong = new AtomicLong(Long.MAX_VALUE);
        this.consumer.subscribe(Collections.singleton(this.inputTopic), new ConsumerRebalanceListener() { // from class: kafka.examples.ExactlyOnceMessageProcessor.1
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                ExactlyOnceMessageProcessor.this.printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + collection);
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                ExactlyOnceMessageProcessor.this.printWithTxnId("Received partition assignment after rebalancing: " + collection);
                atomicLong.set(ExactlyOnceMessageProcessor.this.messagesRemaining(ExactlyOnceMessageProcessor.this.consumer));
            }
        });
        int i = 0;
        while (atomicLong.get() > 0) {
            try {
                ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(200L));
                if (poll.count() > 0) {
                    this.producer.beginTransaction();
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        this.producer.send(transform((ConsumerRecord) it.next()));
                    }
                    this.producer.sendOffsetsToTransaction(consumerOffsets(), this.consumer.groupMetadata());
                    this.producer.commitTransaction();
                    i += poll.count();
                }
            } catch (FencedInstanceIdException e) {
                throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", this.groupInstanceId));
            } catch (KafkaException e2) {
                this.producer.abortTransaction();
                resetToLastCommittedPositions(this.consumer);
            } catch (ProducerFencedException e3) {
                throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", this.transactionalId));
            }
            atomicLong.set(messagesRemaining(this.consumer));
            printWithTxnId("Message remaining: " + atomicLong);
        }
        printWithTxnId("Finished processing " + i + " records");
        this.latch.countDown();
    }

    private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() {
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : this.consumer.assignment()) {
            hashMap.put(topicPartition, new OffsetAndMetadata(this.consumer.position(topicPartition), (String) null));
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void printWithTxnId(String str) {
        System.out.println(this.transactionalId + ": " + str);
    }

    private ProducerRecord<Integer, String> transform(ConsumerRecord<Integer, String> consumerRecord) {
        printWithTxnId("Transformed record (" + consumerRecord.key() + "," + ((String) consumerRecord.value()) + ")");
        return new ProducerRecord<>(this.outputTopic, Integer.valueOf(((Integer) consumerRecord.key()).intValue() / 2), "Transformed_" + ((String) consumerRecord.value()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long messagesRemaining(KafkaConsumer<Integer, String> kafkaConsumer) {
        Map endOffsets = kafkaConsumer.endOffsets(new ArrayList(kafkaConsumer.assignment()));
        if (endOffsets.isEmpty()) {
            return Long.MAX_VALUE;
        }
        return kafkaConsumer.assignment().stream().mapToLong(topicPartition -> {
            long position = kafkaConsumer.position(topicPartition);
            printWithTxnId("Processing partition " + topicPartition + " with full offsets " + endOffsets);
            if (endOffsets.containsKey(topicPartition)) {
                return ((Long) endOffsets.get(topicPartition)).longValue() - position;
            }
            return 0L;
        }).sum();
    }

    private static void resetToLastCommittedPositions(KafkaConsumer<Integer, String> kafkaConsumer) {
        Map committed = kafkaConsumer.committed(kafkaConsumer.assignment());
        kafkaConsumer.assignment().forEach(topicPartition -> {
            OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata) committed.get(topicPartition);
            if (offsetAndMetadata != null) {
                kafkaConsumer.seek(topicPartition, offsetAndMetadata.offset());
            } else {
                kafkaConsumer.seekToBeginning(Collections.singleton(topicPartition));
            }
        });
    }
}
