package kafka.tier.compatibility;

import java.math.RoundingMode;
import java.text.DecimalFormat;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import kafka.tier.compatibility.ProducerConsumerTracker;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.tools.ThroughputThrottler;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Stopwatch;

/* loaded from: input_file:kafka/tier/compatibility/TieringFunctionalityTest.class */
public class TieringFunctionalityTest {
    private static final String TEST_TOPIC_NAME = "confluent-tier-object-store-test-topic-" + System.currentTimeMillis();
    private static final String SALTCHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
    private static int backoffSec;
    private static String bootstrapServers;
    private static String bucketName;
    private static int consumerPollTimeoutMs;
    private static int consumerTimeoutSec;
    private static int consumerMaxPartitionFetchBytes;
    private static boolean debug;
    private static String jmxAuth;
    private static int jmxPort;
    private static boolean jmxSsl;
    private static int lengthOfKeyOrValue;
    private static int numRecords;
    private static int numPartitions;
    private static Properties objectStoreProps;
    private static int producerThroughput;
    private static int producerTimeoutSec;
    private static int replicationFactor;
    private static String testMetricsOutputDirPath;
    private static int tieringCompletedTimeoutSec;
    private static int tierDeletionTimeoutSec;
    private static int tierSegmentHotsetRollMinBytes;
    private static long tierLocalHotsetBytes;
    private static long tierLocalHotsetMs;
    private static Logger logger;

    @Rule
    public final Stopwatch stopwatch = new TimeLoggingStopwatch();

    public static void init(TierTestConfig tierTestConfig) {
        backoffSec = tierTestConfig.getInt(TierTestConfig.BackoffSecProp()).intValue();
        bucketName = tierTestConfig.getString(TierTestConfig.TierS3BucketProp());
        debug = tierTestConfig.getBoolean(TierTestConfig.DebugProp()).booleanValue();
        logger = new Logger(debug);
        lengthOfKeyOrValue = tierTestConfig.getInt(TierTestConfig.LengthKeyValueProp()).intValue();
        numRecords = tierTestConfig.getInt(TierTestConfig.NumRecordsProp()).intValue();
        numPartitions = tierTestConfig.getInt(TierTestConfig.NumPartitionsProp()).intValue();
        objectStoreProps = (Properties) tierTestConfig.getProps();
        producerThroughput = tierTestConfig.getInt(TierTestConfig.ProducerThroughputProp()).intValue();
        replicationFactor = tierTestConfig.getInt(TierTestConfig.ReplicationFactorProp()).intValue();
        tierSegmentHotsetRollMinBytes = tierTestConfig.getInt(TierTestConfig.TierSegmentHotsetRollMinBytesProp()).intValue();
        tierLocalHotsetBytes = tierTestConfig.getLong(TierTestConfig.TierLocalHotsetBytesProp()).longValue();
        tierLocalHotsetMs = tierTestConfig.getLong(TierTestConfig.TierLocalHotsetMsProp()).longValue();
        testMetricsOutputDirPath = tierTestConfig.getString(TierTestConfig.TestMetricsOutputDirProp());
        bootstrapServers = tierTestConfig.getString(TierTestConfig.BootstrapServerProp());
        initJmxProperties(tierTestConfig);
        initTimeouts(tierTestConfig);
        consumerMaxPartitionFetchBytes = tierTestConfig.getInt(TierTestConfig.ConsumerMaxPartitionFetchBytesProp()).intValue();
    }

    private static void initJmxProperties(TierTestConfig tierTestConfig) {
        jmxAuth = tierTestConfig.getString(TierTestConfig.JmxAuthProp());
        jmxPort = tierTestConfig.getInt(TierTestConfig.JmxPortProp()).intValue();
        jmxSsl = tierTestConfig.getBoolean(TierTestConfig.JmxSslProp()).booleanValue();
    }

    private static void initTimeouts(TierTestConfig tierTestConfig) {
        consumerPollTimeoutMs = tierTestConfig.getInt(TierTestConfig.ConsumerPollTimeoutMsProp()).intValue();
        consumerTimeoutSec = tierTestConfig.getInt(TierTestConfig.ConsumerTimeoutSecProp()).intValue();
        producerTimeoutSec = tierTestConfig.getInt(TierTestConfig.ProducerTimeoutSecProp()).intValue();
        tieringCompletedTimeoutSec = tierTestConfig.getInt(TierTestConfig.TieringCompletedTimeoutSecProp()).intValue();
        tierDeletionTimeoutSec = tierTestConfig.getInt(TierTestConfig.TierDeletionTimeoutSecProp()).intValue();
    }

    private String getSaltString(int i) {
        StringBuilder sb = new StringBuilder();
        Random random = new Random();
        while (sb.length() < i) {
            sb.append(SALTCHARS.charAt((int) (random.nextFloat() * SALTCHARS.length())));
        }
        return sb.toString();
    }

    private ProducerConsumerTracker produceData(int i) throws Exception {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        properties.put("request.timeout.ms", 60000);
        properties.put("acks", "all");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Throwable th = null;
        try {
            try {
                ThroughputThrottler throughputThrottler = new ThroughputThrottler(producerThroughput, System.currentTimeMillis());
                ProducerConsumerTracker producerConsumerTracker = new ProducerConsumerTracker();
                long currentTimeMillis = System.currentTimeMillis();
                for (long j = 0; j < Long.MAX_VALUE && producerConsumerTracker.getNumberOfProducedMessages() < numRecords && System.currentTimeMillis() - currentTimeMillis < i * 1000; j++) {
                    ProducerRecord producerRecord = new ProducerRecord(TEST_TOPIC_NAME, getSaltString(lengthOfKeyOrValue), getSaltString(lengthOfKeyOrValue));
                    String format = String.format("Key=%s,Value=%s", producerRecord.key(), producerRecord.value());
                    long currentTimeMillis2 = System.currentTimeMillis();
                    long j2 = j;
                    try {
                        producerConsumerTracker.recordOutstandingMessage(format);
                        kafkaProducer.send(producerRecord, (recordMetadata, exc) -> {
                            if (exc != null) {
                                logger.debug(String.format("Could not produce message %s due to error : %s", format, exc));
                                producerConsumerTracker.removeOutstandingMessage(format);
                                return;
                            }
                            long recordProducedMessage = producerConsumerTracker.recordProducedMessage(format, recordMetadata);
                            if (j2 % 10000 == 0) {
                                DecimalFormat decimalFormat = new DecimalFormat("##.##");
                                decimalFormat.setRoundingMode(RoundingMode.DOWN);
                                logger.debug(String.format("Produced %d messages, remaining: %d, %s%% produced.", Long.valueOf(recordProducedMessage), Long.valueOf(Math.max(0L, numRecords - recordProducedMessage)), decimalFormat.format(Math.min(100.0d, ((recordProducedMessage + 0.0d) / numRecords) * 100.0d))));
                            }
                        });
                    } catch (Exception e) {
                        logger.warn(String.format("Could not send message %s due to error : %s", format, e));
                        producerConsumerTracker.removeOutstandingMessage(format);
                    }
                    if (throughputThrottler.shouldThrottle(j, currentTimeMillis2)) {
                        throughputThrottler.throttle();
                    }
                }
                logger.info(String.format("Produced %d messages", Long.valueOf(producerConsumerTracker.getNumberOfProducedMessages())));
                if (kafkaProducer != null) {
                    if (0 != 0) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                return producerConsumerTracker;
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th3;
        }
    }

    private boolean consumeData(ProducerConsumerTracker producerConsumerTracker, int i) {
        String str = "tier-test-" + UUID.randomUUID().toString();
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        properties.put("group.id", str);
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", 60000);
        properties.put("max.partition.fetch.bytes", Integer.valueOf(consumerMaxPartitionFetchBytes));
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        Throwable th = null;
        try {
            try {
                kafkaConsumer.subscribe(Collections.singleton(TEST_TOPIC_NAME));
                long currentTimeMillis = System.currentTimeMillis() / 1000;
                long currentTimeMillis2 = System.currentTimeMillis();
                do {
                    producerConsumerTracker.recordConsumedMessageBatch(kafkaConsumer.poll(Duration.ofMillis(consumerPollTimeoutMs)));
                    long currentTimeMillis3 = System.currentTimeMillis();
                    if (currentTimeMillis3 - currentTimeMillis2 >= 10000) {
                        logger.debug(producerConsumerTracker.status(true));
                        currentTimeMillis2 = currentTimeMillis3;
                    }
                    if (producerConsumerTracker.getNumberOfMessagesToBeConsumed() == 0) {
                        break;
                    }
                } while ((System.currentTimeMillis() / 1000.0d) - currentTimeMillis <= i);
                logger.debug(producerConsumerTracker.status(true));
                logger.info(producerConsumerTracker.status(false));
                boolean z = producerConsumerTracker.getNumberOfMessagesToBeConsumed() == 0;
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                return z;
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (th != null) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    private void verifyOffsetsForTimesData(ProducerConsumerTracker producerConsumerTracker) {
        Map<TopicPartition, Map<Long, Long>> producerTimestampOffsetMap = producerConsumerTracker.getProducerTimestampOffsetMap();
        Map<TopicPartition, ProducerConsumerTracker.PerPartitionConsumptionInfo> consumptionInfo = producerConsumerTracker.getConsumptionInfo();
        for (TopicPartition topicPartition : consumptionInfo.keySet()) {
            Assert.assertTrue(String.format("Consumer read from partition: %s but producer did not write to it", topicPartition), producerTimestampOffsetMap.containsKey(topicPartition));
            for (Map.Entry<Long, Long> entry : consumptionInfo.get(topicPartition).timestampToOffsetMap().entrySet()) {
                long longValue = entry.getKey().longValue();
                long longValue2 = entry.getValue().longValue();
                Assert.assertTrue(String.format("Producer did not produce any record at timestamp: %s for partition: %s", Long.valueOf(longValue), topicPartition), producerTimestampOffsetMap.get(topicPartition).containsKey(Long.valueOf(longValue)));
                Assert.assertEquals(String.format("Consumer offset does not match with producer offset for Partition %s :: Producer(timestamp: %d, offset: %d) Consumer(timestamp: %d, offset: %d)", topicPartition, Long.valueOf(longValue), producerTimestampOffsetMap.get(topicPartition).get(Long.valueOf(longValue)), Long.valueOf(longValue), Long.valueOf(longValue2)), producerTimestampOffsetMap.get(topicPartition).get(Long.valueOf(longValue)).longValue(), longValue2);
            }
        }
    }

    private boolean checkTieringCompleted(AdminClient adminClient, String str, int i) throws Exception {
        return TierTestUtils.checkTieringCompleted(adminClient, TEST_TOPIC_NAME, numPartitions, jmxAuth, jmxSsl, jmxPort, debug, testMetricsOutputDirPath, str, i);
    }

    private void debug(String str) {
        if (debug) {
            System.err.printf("[%s] DEBUG: %s\n", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")), str);
        }
    }

    @Test
    public void tierRoundtripTest() throws Exception {
        boolean booleanValue;
        AdminClient createAdminClient = TierTestUtils.createAdminClient(bootstrapServers);
        Object obj = new Object();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            AdminClient createAdminClient2 = TierTestUtils.createAdminClient(bootstrapServers);
            synchronized (obj) {
                try {
                    try {
                        logger.call(() -> {
                            return Boolean.valueOf(TierTestUtils.deleteTopic(createAdminClient2, TEST_TOPIC_NAME));
                        }, "deleteTopicDuringShutdown");
                        createAdminClient2.close();
                    } catch (Throwable th) {
                        createAdminClient2.close();
                        throw th;
                    }
                } catch (Exception e) {
                    System.err.printf("Failed to delete topic: %s during shutdown due to error: %s", TEST_TOPIC_NAME, e);
                    createAdminClient2.close();
                }
            }
        }));
        try {
            logger.call(() -> {
                debug(String.format("Creating topic: %s", TEST_TOPIC_NAME));
                TierTestUtils.createTopic(createAdminClient, TEST_TOPIC_NAME, tierSegmentHotsetRollMinBytes, tierLocalHotsetBytes, tierLocalHotsetMs, numPartitions, replicationFactor);
                return null;
            }, String.format("createTopic-%s", TEST_TOPIC_NAME));
            logger.debug(String.format("Producing messages to topic: %s", TEST_TOPIC_NAME));
            ProducerConsumerTracker producerConsumerTracker = (ProducerConsumerTracker) logger.call(() -> {
                return produceData(producerTimeoutSec);
            }, "produceMetrics");
            Assert.assertTrue(String.format("Producer produced %d messages, but expected it to produce at least %d messages within %d sec", Long.valueOf(producerConsumerTracker.getNumberOfProducedMessages()), Integer.valueOf(numRecords), Integer.valueOf(producerTimeoutSec)), producerConsumerTracker.getNumberOfProducedMessages() >= ((long) numRecords));
            if (tieringCompletedTimeoutSec > 0) {
                logger.debug(String.format("Waiting a maximum of %d seconds for tiering to complete for topic: %s", Integer.valueOf(tieringCompletedTimeoutSec), TEST_TOPIC_NAME));
                Assert.assertTrue(String.format("Tiered storage archiving did not complete within timeout of %d sec", Integer.valueOf(tieringCompletedTimeoutSec)), ((Boolean) logger.call(() -> {
                    return Boolean.valueOf(checkTieringCompleted(createAdminClient, String.format("%s_%s", "tierRoundtripTest", "tieringCompletedMetrics"), tieringCompletedTimeoutSec));
                }, "checkTieringCompleted")).booleanValue());
            }
            logger.debug(String.format("Consuming produced events from topic: %s", TEST_TOPIC_NAME));
            Assert.assertTrue(String.format("Did not consume expected number of messages within %d sec", Integer.valueOf(consumerTimeoutSec)), ((Boolean) logger.call(() -> {
                return Boolean.valueOf(consumeData(producerConsumerTracker, consumerTimeoutSec));
            }, "consumeData")).booleanValue());
            JmxMetricsAnalyzer jmxMetricsAnalyzer = new JmxMetricsAnalyzer((Map) logger.call(() -> {
                return TierTestUtils.collectJmxMetrics(createAdminClient, TEST_TOPIC_NAME, testMetricsOutputDirPath, String.format("%s_%s", "tierRoundtripTest", "consumeData"), jmxAuth, jmxSsl, jmxPort, numPartitions, 60, debug);
            }, "collectJmxMetrics"), createAdminClient, TEST_TOPIC_NAME, numPartitions, debug);
            if (tieringCompletedTimeoutSec > 0) {
                Assert.assertTrue("Bytes fetched from local log not within the threshold", ((Boolean) logger.call(() -> {
                    return Boolean.valueOf(jmxMetricsAnalyzer.checkBytesFetchedFromLocalLog(tierSegmentHotsetRollMinBytes * 5));
                }, "checkBytesFetchedFromLocalLog")).booleanValue());
            }
            logger.debug(String.format("Verifying offset-timestamp data from consumed events in topic: %s", TEST_TOPIC_NAME));
            logger.call(() -> {
                verifyOffsetsForTimesData(producerConsumerTracker);
                return null;
            }, "verifyOffsetsForTimesData");
            logger.debug("Checking for no error partitions in cluster");
            JmxMetricsAnalyzer jmxMetricsAnalyzer2 = new JmxMetricsAnalyzer((Map) logger.call(() -> {
                return TierTestUtils.collectJmxMetrics(createAdminClient, TEST_TOPIC_NAME, testMetricsOutputDirPath, String.format("%s_%s", "tierRoundtripTest", "checkClusterState"), jmxAuth, jmxSsl, jmxPort, numPartitions, 60, debug);
            }, "collectJmxMetrics"), createAdminClient, TEST_TOPIC_NAME, numPartitions, debug);
            Assert.assertTrue("Found partitions in error state, while expected none", ((Boolean) logger.call(() -> {
                return Boolean.valueOf(jmxMetricsAnalyzer2.checkZeroErrorPartitions());
            }, "checkZeroErrorPartitions")).booleanValue());
            synchronized (obj) {
                booleanValue = ((Boolean) logger.call(() -> {
                    return Boolean.valueOf(TierTestUtils.deleteTopic(createAdminClient, TEST_TOPIC_NAME));
                }, "deleteTopic")).booleanValue();
            }
            if (booleanValue && tierDeletionTimeoutSec > 0) {
                Assert.assertTrue(String.format("Deletions of test topic data in remote storage did not complete within the expected timeout of: %d sec", Integer.valueOf(tierDeletionTimeoutSec)), ((Boolean) logger.call(() -> {
                    return Boolean.valueOf(TierTestUtils.objectDeletionsCompleted(objectStoreProps, bucketName, backoffSec, tierDeletionTimeoutSec));
                }, "objectDeletionsCompleted")).booleanValue());
            }
            createAdminClient.close();
        } catch (Throwable th) {
            synchronized (obj) {
                if (((Boolean) logger.call(() -> {
                    return Boolean.valueOf(TierTestUtils.deleteTopic(createAdminClient, TEST_TOPIC_NAME));
                }, "deleteTopic")).booleanValue() && tierDeletionTimeoutSec > 0) {
                    Assert.assertTrue(String.format("Deletions of test topic data in remote storage did not complete within the expected timeout of: %d sec", Integer.valueOf(tierDeletionTimeoutSec)), ((Boolean) logger.call(() -> {
                        return Boolean.valueOf(TierTestUtils.objectDeletionsCompleted(objectStoreProps, bucketName, backoffSec, tierDeletionTimeoutSec));
                    }, "objectDeletionsCompleted")).booleanValue());
                }
                createAdminClient.close();
                throw th;
            }
        }
    }
}
