/*
 * Decompiled with CFR 0.152.
 */
package kafka.examples;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import kafka.examples.Consumer;
import kafka.examples.Producer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.ProducerFencedException;

public class ExactlyOnceMessageProcessor
extends Thread {
    private static final boolean READ_COMMITTED = true;
    private final String inputTopic;
    private final String outputTopic;
    private final String transactionalId;
    private final String groupInstanceId;
    private final KafkaProducer<Integer, String> producer;
    private final KafkaConsumer<Integer, String> consumer;
    private final CountDownLatch latch;

    public ExactlyOnceMessageProcessor(String inputTopic, String outputTopic, int instanceIdx, CountDownLatch latch) {
        this.inputTopic = inputTopic;
        this.outputTopic = outputTopic;
        this.transactionalId = "Processor-" + instanceIdx;
        int transactionTimeoutMs = 10000;
        this.producer = new Producer(outputTopic, true, this.transactionalId, true, -1, 10000, null).get();
        this.groupInstanceId = "Txn-consumer-" + instanceIdx;
        this.consumer = new Consumer(inputTopic, "Eos-consumer", Optional.of(this.groupInstanceId), true, -1, null).get();
        this.latch = latch;
    }

    @Override
    public void run() {
        this.producer.initTransactions();
        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
        this.consumer.subscribe(Collections.singleton(this.inputTopic), new ConsumerRebalanceListener(){

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                ExactlyOnceMessageProcessor.this.printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                ExactlyOnceMessageProcessor.this.printWithTxnId("Received partition assignment after rebalancing: " + partitions);
                messageRemaining.set(ExactlyOnceMessageProcessor.this.messagesRemaining((KafkaConsumer<Integer, String>)ExactlyOnceMessageProcessor.this.consumer));
            }
        });
        int messageProcessed = 0;
        while (messageRemaining.get() > 0L) {
            try {
                ConsumerRecords records = this.consumer.poll(Duration.ofMillis(200L));
                if (records.count() > 0) {
                    this.producer.beginTransaction();
                    for (ConsumerRecord record : records) {
                        ProducerRecord<Integer, String> customizedRecord = this.transform((ConsumerRecord<Integer, String>)record);
                        this.producer.send(customizedRecord);
                    }
                    Map<TopicPartition, OffsetAndMetadata> offsets = this.consumerOffsets();
                    this.producer.sendOffsetsToTransaction(offsets, this.consumer.groupMetadata());
                    this.producer.commitTransaction();
                    messageProcessed += records.count();
                }
            }
            catch (ProducerFencedException e) {
                throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", this.transactionalId));
            }
            catch (FencedInstanceIdException e) {
                throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", this.groupInstanceId));
            }
            catch (KafkaException e) {
                this.producer.abortTransaction();
                ExactlyOnceMessageProcessor.resetToLastCommittedPositions(this.consumer);
            }
            messageRemaining.set(this.messagesRemaining(this.consumer));
            this.printWithTxnId("Message remaining: " + messageRemaining);
        }
        this.printWithTxnId("Finished processing " + messageProcessed + " records");
        this.latch.countDown();
    }

    private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() {
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (TopicPartition topicPartition : this.consumer.assignment()) {
            offsets.put(topicPartition, new OffsetAndMetadata(this.consumer.position(topicPartition), null));
        }
        return offsets;
    }

    private void printWithTxnId(String message) {
        System.out.println(this.transactionalId + ": " + message);
    }

    private ProducerRecord<Integer, String> transform(ConsumerRecord<Integer, String> record) {
        this.printWithTxnId("Transformed record (" + record.key() + "," + (String)record.value() + ")");
        return new ProducerRecord(this.outputTopic, (Object)((Integer)record.key() / 2), (Object)("Transformed_" + (String)record.value()));
    }

    private long messagesRemaining(KafkaConsumer<Integer, String> consumer) {
        Map fullEndOffsets = consumer.endOffsets(new ArrayList(consumer.assignment()));
        if (fullEndOffsets.isEmpty()) {
            return Long.MAX_VALUE;
        }
        return consumer.assignment().stream().mapToLong(partition -> {
            long currentPosition = consumer.position(partition);
            this.printWithTxnId("Processing partition " + partition + " with full offsets " + fullEndOffsets);
            if (fullEndOffsets.containsKey(partition)) {
                return (Long)fullEndOffsets.get(partition) - currentPosition;
            }
            return 0L;
        }).sum();
    }

    private static void resetToLastCommittedPositions(KafkaConsumer<Integer, String> consumer) {
        Map committed = consumer.committed(consumer.assignment());
        consumer.assignment().forEach(tp -> {
            OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata)committed.get(tp);
            if (offsetAndMetadata != null) {
                consumer.seek(tp, offsetAndMetadata.offset());
            } else {
                consumer.seekToBeginning(Collections.singleton(tp));
            }
        });
    }
}

