/*
 * Decompiled with CFR 0.152.
 */
package net.lightapi.exporter;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.networknt.config.Config;
import com.networknt.config.JsonMapper;
import com.networknt.kafka.common.KafkaConsumerConfig;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import net.lightapi.exporter.EventMatcher;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

public class Cli {
    @Parameter(names={"--filename", "-f"}, required=false, description="The filename to be exported.")
    String filename;
    @Parameter(names={"--types", "-t"}, required=false, description="The types of service to be exported. Concat with comma if multiple types.")
    String types;
    @Parameter(names={"--start", "-s"}, required=false, description="The start timestamp to be exported.")
    String start;
    @Parameter(names={"--end", "-e"}, required=false, description="The end timestamp to be exported.")
    String end;
    @Parameter(names={"--help", "-h"}, help=true)
    private boolean help;
    private static final Class<?> keyDeserializer = ByteArrayDeserializer.class;
    private static final Class<?> valueDeserializer = ByteArrayDeserializer.class;

    public static void main(String ... argv) throws Exception {
        try {
            Cli cli = new Cli();
            JCommander jCommander = JCommander.newBuilder().addObject(cli).build();
            jCommander.parse(argv);
            cli.run(jCommander);
        }
        catch (ParameterException e) {
            System.out.println("Command line parameter error: " + e.getLocalizedMessage());
            e.usage();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(JCommander jCommander) throws Exception {
        long endTimestampMs;
        long startTimestampMs;
        if (this.help) {
            jCommander.usage();
            return;
        }
        System.out.println("filename = " + this.filename + " types = " + this.types + " start = " + this.start + " end = " + this.end);
        String[] typesArray = StringUtils.split(this.types, ',');
        EventMatcher matcher = new EventMatcher(typesArray);
        KafkaConsumerConfig config = (KafkaConsumerConfig)Config.getInstance().getJsonObjectConfig("kafka-consumer", KafkaConsumerConfig.class);
        Properties props = new Properties();
        props.putAll(config.getProperties());
        props.put("key.deserializer", keyDeserializer);
        props.put("value.deserializer", valueDeserializer);
        props.put("group.id", "export-group-" + System.currentTimeMillis());
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "none");
        try {
            startTimestampMs = OffsetDateTime.parse(this.start).toInstant().toEpochMilli();
            endTimestampMs = OffsetDateTime.parse(this.end).toInstant().toEpochMilli();
        }
        catch (Exception e) {
            e.printStackTrace();
            return;
        }
        props.put("group.id", "" + System.currentTimeMillis());
        KafkaConsumer consumer = null;
        try (BufferedWriter writer = new BufferedWriter(new FileWriter(this.filename));){
            consumer = new KafkaConsumer(props);
            System.out.println("Kafka consumer created.");
            List<PartitionInfo> partitionInfoList = consumer.partitionsFor(config.getTopic());
            if (partitionInfoList == null || partitionInfoList.isEmpty()) {
                System.out.println("No partitions found for topic: " + config.getTopic());
                return;
            }
            System.out.println("Partitions for topic " + config.getTopic() + " " + String.valueOf(partitionInfoList));
            List<TopicPartition> topicPartitions = partitionInfoList.stream().map(pi -> new TopicPartition(config.getTopic(), pi.partition())).collect(Collectors.toList());
            System.out.println("Topic partitions: " + String.valueOf(topicPartitions));
            Map<TopicPartition, Long> timestampsToSearch = topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> startTimestampMs));
            Map<TopicPartition, OffsetAndTimestamp> startingOffsets = consumer.offsetsForTimes(timestampsToSearch);
            System.out.println("Starting offsets for timestamps: " + String.valueOf(startingOffsets));
            consumer.assign(topicPartitions);
            KafkaConsumer finalConsumer = consumer;
            System.out.println("Assigned partitions: " + String.valueOf(topicPartitions));
            startingOffsets.forEach((tp, offsetAndTimestamp) -> {
                if (offsetAndTimestamp != null) {
                    System.out.println("Seeking partition " + tp.partition() + " offset " + offsetAndTimestamp.offset());
                    finalConsumer.seek((TopicPartition)tp, offsetAndTimestamp.offset());
                } else {
                    System.out.println("No offset found for partition " + tp.partition() + " >= startTs " + this.start + " seeking to end.");
                    finalConsumer.seekToEnd(Collections.singletonList(tp));
                }
            });
            int giveUp = 5;
            int noRecordsCount = 0;
            boolean continuePolling = true;
            while (continuePolling) {
                ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000L));
                if (consumerRecords.isEmpty()) {
                    if (++noRecordsCount < 5) continue;
                    System.out.println("No more records found after {} polls, stopping. 5");
                    continuePolling = false;
                    continue;
                }
                noRecordsCount = 0;
                block17: for (TopicPartition partition : consumerRecords.partitions()) {
                    List recordsForPartition = consumerRecords.records(partition);
                    for (ConsumerRecord record : recordsForPartition) {
                        if (record.timestamp() <= endTimestampMs) {
                            String keyStr = new String((byte[])record.key(), StandardCharsets.UTF_8);
                            String valueStr = new String((byte[])record.value(), StandardCharsets.UTF_8);
                            Map<String, Object> map = JsonMapper.string2Map(valueStr);
                            String type = (String)map.get("type");
                            if (!matcher.matchesEvent(type)) {
                                System.out.println("Type " + type + " is not in the types array. Skip this record.");
                                continue;
                            }
                            List<String> ignoredEvents = this.getIgnoredEvents();
                            if (ignoredEvents.contains(type)) {
                                System.out.println("Type " + type + " is in the ignored events list. Skip this record.");
                                continue;
                            }
                            String s2 = keyStr + " " + valueStr;
                            try {
                                writer.write(s2);
                                writer.newLine();
                            }
                            catch (IOException e) {
                                e.printStackTrace();
                            }
                            continue;
                        }
                        System.out.printf("Record timestamp %s exceeds endTs %s for partition %s, stopping processing for this partition in this poll.%n", record.timestamp(), endTimestampMs, partition.partition());
                        continue block17;
                    }
                }
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            System.out.println("Failed to create Kafka consumer. Please check your configuration.");
            return;
        }
        finally {
            if (consumer != null) {
                consumer.close();
                System.out.println("Kafka consumer closed.");
            }
        }
        System.out.println("All Portal Events have been exported successfully to " + this.filename + ". Have fun!!!");
    }

    private List<String> getIgnoredEvents() {
        ArrayList<String> list = new ArrayList<String>();
        list.add("AuthCodeCreatedEvent");
        list.add("AuthCodeDeletedEvent");
        list.add("AuthRefreshTokenDeletedEvent");
        list.add("AuthRefreshTokenCreatedEvent");
        return list;
    }

    private static class ExportConsumerRebalanceListener
    implements ConsumerRebalanceListener {
        private ExportConsumerRebalanceListener() {
        }

        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            System.out.println("Called onPartitionsRevoked with partitions:" + String.valueOf(partitions));
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            System.out.println("Called onPartitionsAssigned with partitions:" + String.valueOf(partitions));
        }
    }
}

