/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.compatibility;

import java.math.RoundingMode;
import java.text.DecimalFormat;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import kafka.tier.compatibility.JmxMetricsAnalyzer;
import kafka.tier.compatibility.Logger;
import kafka.tier.compatibility.ProducerConsumerTracker;
import kafka.tier.compatibility.TierTestConfig;
import kafka.tier.compatibility.TierTestUtils;
import kafka.tier.compatibility.TimeLoggingStopwatch;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.tools.ThroughputThrottler;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Stopwatch;

public class TieringFunctionalityTest {
    private static final String TEST_TOPIC_NAME = "confluent-tier-object-store-test-topic-" + System.currentTimeMillis();
    private static final String SALTCHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
    private static int backoffSec;
    private static String bootstrapServers;
    private static String bucketName;
    private static int consumerPollTimeoutMs;
    private static int consumerTimeoutSec;
    private static int consumerMaxPartitionFetchBytes;
    private static boolean debug;
    private static String jmxAuth;
    private static int jmxPort;
    private static boolean jmxSsl;
    private static int lengthOfKeyOrValue;
    private static int numRecords;
    private static int numPartitions;
    private static Properties objectStoreProps;
    private static int producerThroughput;
    private static int producerTimeoutSec;
    private static int replicationFactor;
    private static String testMetricsOutputDirPath;
    private static int tieringCompletedTimeoutSec;
    private static int tierDeletionTimeoutSec;
    private static int tierSegmentHotsetRollMinBytes;
    private static long tierLocalHotsetBytes;
    private static long tierLocalHotsetMs;
    private static Logger logger;
    @Rule
    public final Stopwatch stopwatch = new TimeLoggingStopwatch();

    public static void init(TierTestConfig config) {
        backoffSec = config.getInt(TierTestConfig.BackoffSecProp());
        bucketName = config.getString(TierTestConfig.TierS3BucketProp());
        debug = config.getBoolean(TierTestConfig.DebugProp());
        logger = new Logger(debug);
        lengthOfKeyOrValue = config.getInt(TierTestConfig.LengthKeyValueProp());
        numRecords = config.getInt(TierTestConfig.NumRecordsProp());
        numPartitions = config.getInt(TierTestConfig.NumPartitionsProp());
        objectStoreProps = (Properties)config.getProps();
        producerThroughput = config.getInt(TierTestConfig.ProducerThroughputProp());
        replicationFactor = config.getInt(TierTestConfig.ReplicationFactorProp());
        tierSegmentHotsetRollMinBytes = config.getInt(TierTestConfig.TierSegmentHotsetRollMinBytesProp());
        tierLocalHotsetBytes = config.getLong(TierTestConfig.TierLocalHotsetBytesProp());
        tierLocalHotsetMs = config.getLong(TierTestConfig.TierLocalHotsetMsProp());
        testMetricsOutputDirPath = config.getString(TierTestConfig.TestMetricsOutputDirProp());
        bootstrapServers = config.getString(TierTestConfig.BootstrapServerProp());
        TieringFunctionalityTest.initJmxProperties(config);
        TieringFunctionalityTest.initTimeouts(config);
        consumerMaxPartitionFetchBytes = config.getInt(TierTestConfig.ConsumerMaxPartitionFetchBytesProp());
    }

    private static void initJmxProperties(TierTestConfig config) {
        jmxAuth = config.getString(TierTestConfig.JmxAuthProp());
        jmxPort = config.getInt(TierTestConfig.JmxPortProp());
        jmxSsl = config.getBoolean(TierTestConfig.JmxSslProp());
    }

    private static void initTimeouts(TierTestConfig config) {
        consumerPollTimeoutMs = config.getInt(TierTestConfig.ConsumerPollTimeoutMsProp());
        consumerTimeoutSec = config.getInt(TierTestConfig.ConsumerTimeoutSecProp());
        producerTimeoutSec = config.getInt(TierTestConfig.ProducerTimeoutSecProp());
        tieringCompletedTimeoutSec = config.getInt(TierTestConfig.TieringCompletedTimeoutSecProp());
        tierDeletionTimeoutSec = config.getInt(TierTestConfig.TierDeletionTimeoutSecProp());
    }

    private String getSaltString(int length) {
        StringBuilder salt = new StringBuilder();
        Random random = new Random();
        while (salt.length() < length) {
            int index = (int)(random.nextFloat() * (float)SALTCHARS.length());
            salt.append(SALTCHARS.charAt(index));
        }
        return salt.toString();
    }

    private ProducerConsumerTracker produceData(int timeoutSec) throws Exception {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", bootstrapServers);
        producerProps.put("key.serializer", StringSerializer.class);
        producerProps.put("value.serializer", StringSerializer.class);
        producerProps.put("request.timeout.ms", (Object)60000);
        producerProps.put("acks", "all");
        try (KafkaProducer producer = new KafkaProducer(producerProps);){
            long startMs = System.currentTimeMillis();
            ThroughputThrottler enqueueThrottler = new ThroughputThrottler((long)producerThroughput, startMs);
            ProducerConsumerTracker producerConsumerTracker = new ProducerConsumerTracker();
            long startProduceTimeMs = System.currentTimeMillis();
            for (long msgIdx = 0L; msgIdx < Long.MAX_VALUE && producerConsumerTracker.getNumberOfProducedMessages() < (long)numRecords && System.currentTimeMillis() - startProduceTimeMs < (long)(timeoutSec * 1000); ++msgIdx) {
                String key = this.getSaltString(lengthOfKeyOrValue);
                String value = this.getSaltString(lengthOfKeyOrValue);
                ProducerRecord record = new ProducerRecord(TEST_TOPIC_NAME, (Object)key, (Object)value);
                String msg = String.format("Key=%s,Value=%s", record.key(), record.value());
                long sendStartMs = System.currentTimeMillis();
                long thisMsgIdx = msgIdx;
                try {
                    producerConsumerTracker.recordOutstandingMessage(msg);
                    producer.send(record, (metadata, exception) -> {
                        if (exception == null) {
                            long numProducedMessages = producerConsumerTracker.recordProducedMessage(msg, metadata);
                            if (thisMsgIdx % 10000L == 0L) {
                                DecimalFormat df = new DecimalFormat("##.##");
                                df.setRoundingMode(RoundingMode.DOWN);
                                String percentProduced = df.format(Math.min(100.0, ((double)numProducedMessages + 0.0) / (double)numRecords * 100.0));
                                logger.debug(String.format("Produced %d messages, remaining: %d, %s%% produced.", numProducedMessages, Math.max(0L, (long)numRecords - numProducedMessages), percentProduced));
                            }
                        } else {
                            logger.debug(String.format("Could not produce message %s due to error : %s", msg, exception));
                            producerConsumerTracker.removeOutstandingMessage(msg);
                        }
                    });
                }
                catch (Exception e) {
                    logger.warn(String.format("Could not send message %s due to error : %s", msg, e));
                    producerConsumerTracker.removeOutstandingMessage(msg);
                }
                if (!enqueueThrottler.shouldThrottle(msgIdx, sendStartMs)) continue;
                enqueueThrottler.throttle();
            }
            logger.info(String.format("Produced %d messages", producerConsumerTracker.getNumberOfProducedMessages()));
            ProducerConsumerTracker producerConsumerTracker2 = producerConsumerTracker;
            return producerConsumerTracker2;
        }
    }

    private boolean consumeData(ProducerConsumerTracker producerConsumerTracker, int timeoutSec) {
        String groupId = "tier-test-" + UUID.randomUUID().toString();
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", bootstrapServers);
        consumerProps.put("key.deserializer", StringDeserializer.class);
        consumerProps.put("value.deserializer", StringDeserializer.class);
        consumerProps.put("group.id", groupId);
        consumerProps.put("auto.offset.reset", "earliest");
        consumerProps.put("session.timeout.ms", (Object)60000);
        consumerProps.put("max.partition.fetch.bytes", (Object)consumerMaxPartitionFetchBytes);
        try (KafkaConsumer consumer = new KafkaConsumer(consumerProps);){
            consumer.subscribe(Collections.singleton(TEST_TOPIC_NAME));
            long startTimeSec = System.currentTimeMillis() / 1000L;
            long lastLogTimeMs = System.currentTimeMillis();
            long yetToBeConsumed = 0L;
            long consumedMessages = 0L;
            do {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(consumerPollTimeoutMs));
                producerConsumerTracker.recordConsumedMessageBatch((ConsumerRecords<String, String>)records);
                long currentTimeMs = System.currentTimeMillis();
                if (currentTimeMs - lastLogTimeMs < 10000L) continue;
                logger.debug(producerConsumerTracker.status(true));
                lastLogTimeMs = currentTimeMs;
            } while (producerConsumerTracker.getNumberOfMessagesToBeConsumed() != 0L && (double)System.currentTimeMillis() / 1000.0 - (double)startTimeSec <= (double)timeoutSec);
            logger.debug(producerConsumerTracker.status(true));
            logger.info(producerConsumerTracker.status(false));
            boolean bl = producerConsumerTracker.getNumberOfMessagesToBeConsumed() == 0L;
            return bl;
        }
    }

    private void verifyOffsetsForTimesData(ProducerConsumerTracker producerConsumerTracker) {
        Map<TopicPartition, Map<Long, Long>> producerData = producerConsumerTracker.getProducerTimestampOffsetMap();
        Map<TopicPartition, ProducerConsumerTracker.PerPartitionConsumptionInfo> consumerData = producerConsumerTracker.getConsumptionInfo();
        for (TopicPartition tp : consumerData.keySet()) {
            Assert.assertTrue((String)String.format("Consumer read from partition: %s but producer did not write to it", tp), (boolean)producerData.containsKey(tp));
            ProducerConsumerTracker.PerPartitionConsumptionInfo consumptionInfo = consumerData.get(tp);
            for (Map.Entry<Long, Long> consumerDataEntry : consumptionInfo.timestampToOffsetMap().entrySet()) {
                long consumerTimestamp = consumerDataEntry.getKey();
                long consumerOffset = consumerDataEntry.getValue();
                Assert.assertTrue((String)String.format("Producer did not produce any record at timestamp: %s for partition: %s", consumerTimestamp, tp), (boolean)producerData.get(tp).containsKey(consumerTimestamp));
                Assert.assertEquals((String)String.format("Consumer offset does not match with producer offset for Partition %s :: Producer(timestamp: %d, offset: %d) Consumer(timestamp: %d, offset: %d)", tp, consumerTimestamp, producerData.get(tp).get(consumerTimestamp), consumerTimestamp, consumerOffset), (long)producerData.get(tp).get(consumerTimestamp), (long)consumerOffset);
            }
        }
    }

    private boolean checkTieringCompleted(AdminClient adminClient, String metricsOutputFilenamePrefix, int durationSec) throws Exception {
        return TierTestUtils.checkTieringCompleted(adminClient, TEST_TOPIC_NAME, numPartitions, jmxAuth, jmxSsl, jmxPort, debug, testMetricsOutputDirPath, metricsOutputFilenamePrefix, durationSec);
    }

    private void debug(String msg) {
        if (debug) {
            String currentTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
            System.err.printf("[%s] DEBUG: %s\n", currentTime, msg);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void tierRoundtripTest() throws Exception {
        Object object;
        boolean wasTopicDeleted;
        boolean isTopicCreated = false;
        AdminClient adminClient = TierTestUtils.createAdminClient(bootstrapServers);
        Object deletionLock = new Object();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            AdminClient deletionClient = TierTestUtils.createAdminClient(bootstrapServers);
            Object object = deletionLock;
            synchronized (object) {
                try {
                    logger.call(() -> TierTestUtils.deleteTopic(deletionClient, TEST_TOPIC_NAME), "deleteTopicDuringShutdown");
                }
                catch (Exception e) {
                    System.err.printf("Failed to delete topic: %s during shutdown due to error: %s", TEST_TOPIC_NAME, e);
                }
                finally {
                    deletionClient.close();
                }
            }
        }));
        try {
            logger.call(() -> {
                this.debug(String.format("Creating topic: %s", TEST_TOPIC_NAME));
                TierTestUtils.createTopic(adminClient, TEST_TOPIC_NAME, tierSegmentHotsetRollMinBytes, tierLocalHotsetBytes, tierLocalHotsetMs, numPartitions, replicationFactor);
                return null;
            }, String.format("createTopic-%s", TEST_TOPIC_NAME));
            logger.debug(String.format("Producing messages to topic: %s", TEST_TOPIC_NAME));
            ProducerConsumerTracker producerConsumerTracker = logger.call(() -> this.produceData(producerTimeoutSec), "produceMetrics");
            Assert.assertTrue((String)String.format("Producer produced %d messages, but expected it to produce at least %d messages within %d sec", producerConsumerTracker.getNumberOfProducedMessages(), numRecords, producerTimeoutSec), (producerConsumerTracker.getNumberOfProducedMessages() >= (long)numRecords ? 1 : 0) != 0);
            if (tieringCompletedTimeoutSec > 0) {
                logger.debug(String.format("Waiting a maximum of %d seconds for tiering to complete for topic: %s", tieringCompletedTimeoutSec, TEST_TOPIC_NAME));
                Assert.assertTrue((String)String.format("Tiered storage archiving did not complete within timeout of %d sec", tieringCompletedTimeoutSec), (boolean)logger.call(() -> this.checkTieringCompleted(adminClient, String.format("%s_%s", "tierRoundtripTest", "tieringCompletedMetrics"), tieringCompletedTimeoutSec), "checkTieringCompleted"));
            }
            logger.debug(String.format("Consuming produced events from topic: %s", TEST_TOPIC_NAME));
            Assert.assertTrue((String)String.format("Did not consume expected number of messages within %d sec", consumerTimeoutSec), (boolean)logger.call(() -> this.consumeData(producerConsumerTracker, consumerTimeoutSec), "consumeData"));
            Map consumeMetricFilenames = logger.call(() -> TierTestUtils.collectJmxMetrics(adminClient, TEST_TOPIC_NAME, testMetricsOutputDirPath, String.format("%s_%s", "tierRoundtripTest", "consumeData"), jmxAuth, jmxSsl, jmxPort, numPartitions, 60, debug), "collectJmxMetrics");
            JmxMetricsAnalyzer consumeMetricsAnalyzer = new JmxMetricsAnalyzer(consumeMetricFilenames, adminClient, TEST_TOPIC_NAME, numPartitions, debug);
            if (tieringCompletedTimeoutSec > 0) {
                Assert.assertTrue((String)"Bytes fetched from local log not within the threshold", (boolean)logger.call(() -> consumeMetricsAnalyzer.checkBytesFetchedFromLocalLog(tierSegmentHotsetRollMinBytes * 5), "checkBytesFetchedFromLocalLog"));
            }
            logger.debug(String.format("Verifying offset-timestamp data from consumed events in topic: %s", TEST_TOPIC_NAME));
            logger.call(() -> {
                this.verifyOffsetsForTimesData(producerConsumerTracker);
                return null;
            }, "verifyOffsetsForTimesData");
            logger.debug("Checking for no error partitions in cluster");
            Map clusterStateMetricFilenames = logger.call(() -> TierTestUtils.collectJmxMetrics(adminClient, TEST_TOPIC_NAME, testMetricsOutputDirPath, String.format("%s_%s", "tierRoundtripTest", "checkClusterState"), jmxAuth, jmxSsl, jmxPort, numPartitions, 60, debug), "collectJmxMetrics");
            JmxMetricsAnalyzer clusterStateMetricsAnalyzer = new JmxMetricsAnalyzer(clusterStateMetricFilenames, adminClient, TEST_TOPIC_NAME, numPartitions, debug);
            Assert.assertTrue((String)"Found partitions in error state, while expected none", (boolean)logger.call(() -> clusterStateMetricsAnalyzer.checkZeroErrorPartitions(), "checkZeroErrorPartitions"));
            wasTopicDeleted = false;
            object = deletionLock;
        }
        catch (Throwable throwable) {
            boolean wasTopicDeleted2 = false;
            Object object2 = deletionLock;
            synchronized (object2) {
                wasTopicDeleted2 = logger.call(() -> TierTestUtils.deleteTopic(adminClient, TEST_TOPIC_NAME), "deleteTopic");
            }
            if (wasTopicDeleted2 && tierDeletionTimeoutSec > 0) {
                Assert.assertTrue((String)String.format("Deletions of test topic data in remote storage did not complete within the expected timeout of: %d sec", tierDeletionTimeoutSec), (boolean)logger.call(() -> TierTestUtils.objectDeletionsCompleted(objectStoreProps, bucketName, backoffSec, tierDeletionTimeoutSec), "objectDeletionsCompleted"));
            }
            adminClient.close();
            throw throwable;
        }
        synchronized (object) {
            wasTopicDeleted = logger.call(() -> TierTestUtils.deleteTopic(adminClient, TEST_TOPIC_NAME), "deleteTopic");
        }
        if (wasTopicDeleted && tierDeletionTimeoutSec > 0) {
            Assert.assertTrue((String)String.format("Deletions of test topic data in remote storage did not complete within the expected timeout of: %d sec", tierDeletionTimeoutSec), (boolean)logger.call(() -> TierTestUtils.objectDeletionsCompleted(objectStoreProps, bucketName, backoffSec, tierDeletionTimeoutSec), "objectDeletionsCompleted"));
        }
        adminClient.close();
    }
}

