/*
 * Decompiled with CFR 0.152.
 */
package kafka.examples;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kafka.examples.Consumer;
import kafka.examples.ExactlyOnceMessageProcessor;
import kafka.examples.Producer;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;

public class KafkaExactlyOnceDemo {
    private static final String INPUT_TOPIC = "input-topic";
    private static final String OUTPUT_TOPIC = "output-topic";

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        if (args.length != 3) {
            throw new IllegalArgumentException("Should accept 3 parameters: [number of partitions], [number of instances], [number of records]");
        }
        int numPartitions = Integer.parseInt(args[0]);
        int numInstances = Integer.parseInt(args[1]);
        int numRecords = Integer.parseInt(args[2]);
        KafkaExactlyOnceDemo.recreateTopics(numPartitions);
        CountDownLatch prePopulateLatch = new CountDownLatch(1);
        Producer producerThread = new Producer(INPUT_TOPIC, false, null, true, numRecords, -1, prePopulateLatch);
        producerThread.start();
        if (!prePopulateLatch.await(5L, TimeUnit.MINUTES)) {
            throw new TimeoutException("Timeout after 5 minutes waiting for data pre-population");
        }
        CountDownLatch transactionalCopyLatch = new CountDownLatch(numInstances);
        for (int instanceIdx = 0; instanceIdx < numInstances; ++instanceIdx) {
            ExactlyOnceMessageProcessor messageProcessor = new ExactlyOnceMessageProcessor(INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx, transactionalCopyLatch);
            messageProcessor.start();
        }
        if (!transactionalCopyLatch.await(5L, TimeUnit.MINUTES)) {
            throw new TimeoutException("Timeout after 5 minutes waiting for transactionally message copy");
        }
        CountDownLatch consumeLatch = new CountDownLatch(1);
        Consumer consumerThread = new Consumer(OUTPUT_TOPIC, "Verify-consumer", Optional.empty(), true, numRecords, consumeLatch);
        consumerThread.start();
        if (!consumeLatch.await(5L, TimeUnit.MINUTES)) {
            throw new TimeoutException("Timeout after 5 minutes waiting for output data consumption");
        }
        consumerThread.shutdown();
        System.out.println("All finished!");
    }

    private static void recreateTopics(int numPartitions) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        Admin adminClient = Admin.create((Properties)props);
        List<String> topicsToDelete = Arrays.asList(INPUT_TOPIC, OUTPUT_TOPIC);
        KafkaExactlyOnceDemo.deleteTopic(adminClient, topicsToDelete);
        while (true) {
            System.out.println("Making sure the topics are deleted successfully: " + topicsToDelete);
            Set listedTopics = (Set)adminClient.listTopics().names().get();
            System.out.println("Current list of topics: " + listedTopics);
            boolean hasTopicInfo = false;
            for (String listedTopic : listedTopics) {
                if (!topicsToDelete.contains(listedTopic)) continue;
                hasTopicInfo = true;
                break;
            }
            if (!hasTopicInfo) break;
            Thread.sleep(1000L);
        }
        while (true) {
            boolean replicationFactor = true;
            List<NewTopic> newTopics = Arrays.asList(new NewTopic(INPUT_TOPIC, numPartitions, 1), new NewTopic(OUTPUT_TOPIC, numPartitions, 1));
            try {
                adminClient.createTopics(newTopics).all().get();
                System.out.println("Created new topics: " + newTopics);
            }
            catch (ExecutionException e) {
                if (!(e.getCause() instanceof TopicExistsException)) {
                    throw e;
                }
                System.out.println("Metadata of the old topics are not cleared yet...");
                KafkaExactlyOnceDemo.deleteTopic(adminClient, topicsToDelete);
                Thread.sleep(1000L);
                continue;
            }
            break;
        }
    }

    private static void deleteTopic(Admin adminClient, List<String> topicsToDelete) throws InterruptedException, ExecutionException {
        try {
            adminClient.deleteTopics(topicsToDelete).all().get();
        }
        catch (ExecutionException e) {
            if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
                throw e;
            }
            System.out.println("Encountered exception during topic deletion: " + e.getCause());
        }
        System.out.println("Deleted old topics: " + topicsToDelete);
    }
}

