package kafka.examples;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;

/* loaded from: input_file:kafka/examples/KafkaExactlyOnceDemo.class */
public class KafkaExactlyOnceDemo {
    private static final String INPUT_TOPIC = "input-topic";
    private static final String OUTPUT_TOPIC = "output-topic";

    public static void main(String[] strArr) throws InterruptedException, ExecutionException {
        if (strArr.length != 3) {
            throw new IllegalArgumentException("Should accept 3 parameters: [number of partitions], [number of instances], [number of records]");
        }
        int parseInt = Integer.parseInt(strArr[0]);
        int parseInt2 = Integer.parseInt(strArr[1]);
        int parseInt3 = Integer.parseInt(strArr[2]);
        recreateTopics(parseInt);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        new Producer(INPUT_TOPIC, false, null, true, parseInt3, -1, countDownLatch).start();
        if (!countDownLatch.await(5L, TimeUnit.MINUTES)) {
            throw new TimeoutException("Timeout after 5 minutes waiting for data pre-population");
        }
        CountDownLatch countDownLatch2 = new CountDownLatch(parseInt2);
        for (int i = 0; i < parseInt2; i++) {
            new ExactlyOnceMessageProcessor(INPUT_TOPIC, OUTPUT_TOPIC, i, countDownLatch2).start();
        }
        if (!countDownLatch2.await(5L, TimeUnit.MINUTES)) {
            throw new TimeoutException("Timeout after 5 minutes waiting for transactionally message copy");
        }
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        Consumer consumer = new Consumer(OUTPUT_TOPIC, "Verify-consumer", Optional.empty(), true, parseInt3, countDownLatch3);
        consumer.start();
        if (!countDownLatch3.await(5L, TimeUnit.MINUTES)) {
            throw new TimeoutException("Timeout after 5 minutes waiting for output data consumption");
        }
        consumer.shutdown();
        System.out.println("All finished!");
    }

    private static void recreateTopics(int i) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        Admin create = Admin.create(properties);
        List asList = Arrays.asList(INPUT_TOPIC, OUTPUT_TOPIC);
        deleteTopic(create, asList);
        while (true) {
            System.out.println("Making sure the topics are deleted successfully: " + asList);
            Set set = (Set) create.listTopics().names().get();
            System.out.println("Current list of topics: " + set);
            boolean z = false;
            Iterator it = set.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                } else if (asList.contains((String) it.next())) {
                    z = true;
                    break;
                }
            }
            if (!z) {
                break;
            } else {
                Thread.sleep(1000L);
            }
        }
        while (true) {
            List asList2 = Arrays.asList(new NewTopic(INPUT_TOPIC, i, (short) 1), new NewTopic(OUTPUT_TOPIC, i, (short) 1));
            try {
                create.createTopics(asList2).all().get();
                System.out.println("Created new topics: " + asList2);
                return;
            } catch (ExecutionException e) {
                if (!(e.getCause() instanceof TopicExistsException)) {
                    throw e;
                }
                System.out.println("Metadata of the old topics are not cleared yet...");
                deleteTopic(create, asList);
                Thread.sleep(1000L);
            }
        }
    }

    private static void deleteTopic(Admin admin, List<String> list) throws InterruptedException, ExecutionException {
        try {
            admin.deleteTopics(list).all().get();
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
                throw e;
            }
            System.out.println("Encountered exception during topic deletion: " + e.getCause());
        }
        System.out.println("Deleted old topics: " + list);
    }
}
