/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.compatibility;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import java.io.File;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import kafka.log.LogConfig;
import kafka.tier.compatibility.JmxMetricsAnalyzer;
import kafka.tier.compatibility.JmxMetricsScraper;
import kafka.tier.compatibility.Logger;
import kafka.tier.compatibility.ObjectStoreUtils;
import kafka.tier.store.S3TierObjectStore;
import kafka.tier.store.S3TierObjectStoreConfig;
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsOptions;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;

public class TierTestUtils {
    private static final Integer DELETE_TOPIC_TIMEOUT_MS = 10000;
    private static final Integer DESCRIBE_CLUSTER_TIMEOUT_MS = 30000;

    private TierTestUtils() {
    }

    public static AdminClient createAdminClient(String bootstrapServers) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        AdminClient adminClient = KafkaAdminClient.create((Properties)properties);
        return adminClient;
    }

    public static void createTopic(AdminClient adminClient, String topic, int tierSegmentHotsetRollMinBytes, long tierLocalHotsetBytes, long tierLocalHotsetMs, int numPartitions, int replicationFactor) throws ExecutionException, InterruptedException {
        HashMap<String, String> topicConfigs = new HashMap<String, String>();
        topicConfigs.put(LogConfig.TierEnableProp(), "true");
        topicConfigs.put("confluent.tier.segment.hotset.roll.min.bytes", String.valueOf(tierSegmentHotsetRollMinBytes));
        topicConfigs.put("confluent.tier.local.hotset.bytes", String.valueOf(tierLocalHotsetBytes));
        topicConfigs.put("confluent.tier.local.hotset.ms", String.valueOf(tierLocalHotsetMs));
        NewTopic topicObject = new NewTopic(topic, numPartitions, (short)replicationFactor);
        topicObject.configs(topicConfigs);
        CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(topicObject));
        result.all().get();
    }

    public static boolean deleteTopic(AdminClient adminClient, String topic) throws ExecutionException, InterruptedException {
        Set existingtopics = (Set)adminClient.listTopics().names().get();
        if (!existingtopics.contains(topic)) {
            return false;
        }
        List<String> topics = Collections.singletonList(topic);
        DeleteTopicsResult result = adminClient.deleteTopics(topics, new DeleteTopicsOptions().timeoutMs(DELETE_TOPIC_TIMEOUT_MS));
        result.all().get();
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static boolean objectDeletionsCompleted(Properties objectStoreProps, String bucketName, int backoffSec, int timeoutSec) {
        AmazonS3 s3Client = S3TierObjectStore.client((S3TierObjectStoreConfig)ObjectStoreUtils.createS3ObjectStoreConfig(objectStoreProps));
        ObjectListing objectListing = null;
        ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucketName).withPrefix("0/");
        double startTimeSec = (double)System.currentTimeMillis() / 1000.0;
        try {
            do {
                try {
                    objectListing = s3Client.listObjects(listObjectsRequest);
                }
                catch (Exception e) {
                    new Logger().warn("Could not list S3 objects, ignoring exception and retrying: " + e);
                }
                Thread.sleep(backoffSec * 1000);
            } while ((objectListing == null || !objectListing.getObjectSummaries().isEmpty()) && (double)System.currentTimeMillis() / 1000.0 - startTimeSec <= (double)timeoutSec);
        }
        catch (Throwable throwable) {
            s3Client.shutdown();
            return objectListing != null && objectListing.getObjectSummaries().isEmpty();
        }
        s3Client.shutdown();
        return objectListing != null && objectListing.getObjectSummaries().isEmpty();
    }

    private static Map<Integer, String> getJmxUrls(AdminClient adminClient, int jmxPort) throws ExecutionException, InterruptedException {
        Collection nodes = (Collection)adminClient.describeCluster(new DescribeClusterOptions().timeoutMs(DESCRIBE_CLUSTER_TIMEOUT_MS)).nodes().get();
        return nodes.stream().collect(Collectors.toMap(node -> node.id(), node -> String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", node.host(), jmxPort)));
    }

    public static Map<Integer, String> collectJmxMetrics(AdminClient adminClient, String topic, String testMetricsOutputDirPath, String metricsOutputFilenamePrefix, String jmxAuth, boolean jmxSsl, int jmxPort, int numPartitions, int durationSec, boolean debug) throws ExecutionException, InterruptedException {
        boolean created;
        ArrayList<String> objectNames = new ArrayList<String>(Arrays.asList("kafka.tier.tasks.archive:type=TierArchiver,name=TotalLag", "kafka.tier.tasks.archive:type=TierArchiver,name=TotalLagWithoutErrorPartitions", "kafka.server:type=TierFetcher", "kafka.tier.tasks:type=TierTasks,name=NumPartitionsInError", "kafka.tier.tasks:type=TierTasks,name=HeartbeatMs", "kafka.server:type=TierTopicConsumer", "kafka.server:type=TierTopicConsumer", "kafka.tier:type=TierDeletedPartitionsCoordinator,name=HeartbeatMs", "kafka.tier:type=TierDeletedPartitionsCoordinator,name=TierNumInProgressPartitionDeletions", "kafka.tier:type=TierDeletedPartitionsCoordinator,name=TierNumQueuedPartitionDeletions"));
        ArrayList<String> attributes = new ArrayList<String>(Arrays.asList("Value", "Value", "BytesFetchedTotal", "Value", "Value", "HeartbeatMs", "ErrorPartitions", " ", "Value", "Value"));
        for (int partition = 0; partition < numPartitions; ++partition) {
            objectNames.addAll(Arrays.asList(String.format("kafka.log:type=Log,name=Size,topic=%s,partition=%d", topic, partition), String.format("kafka.log:type=Log,name=TotalSize,topic=%s,partition=%d", topic, partition), String.format("kafka.log:type=Log,name=TierSize,topic=%s,partition=%d", topic, partition), String.format("kafka.log:type=Log,name=NumLogSegments,topic=%s,partition=%d", topic, partition)));
            attributes.addAll(Arrays.asList("Value", "Value", "Value", "Value"));
        }
        Map<Integer, String> jmxUrlsMap = TierTestUtils.getJmxUrls(adminClient, jmxPort);
        ArrayList threads = new ArrayList();
        HashMap<Integer, String> metricsFilenames = new HashMap<Integer, String>();
        File testMetricsOutputBaseDir = new File(testMetricsOutputDirPath);
        if (!testMetricsOutputBaseDir.exists() && !(created = testMetricsOutputBaseDir.mkdirs())) {
            throw new IllegalStateException("Could not create dir: " + testMetricsOutputBaseDir);
        }
        try {
            jmxUrlsMap.entrySet().stream().forEach(entry -> {
                int nodeId = (Integer)entry.getKey();
                String jmxUrl = (String)entry.getValue();
                File metricsFile = new File(Paths.get(testMetricsOutputBaseDir.getAbsolutePath(), new String[0]).resolve(String.format("%s-node%d-jmxMetrics.csv", metricsOutputFilenamePrefix, nodeId)).toUri());
                metricsFilenames.put(nodeId, metricsFile.getAbsolutePath());
                JmxMetricsScraper thread = new JmxMetricsScraper(String.format("%s_%d", metricsOutputFilenamePrefix, nodeId), objectNames.toArray(new String[0]), attributes.toArray(new String[0]), 1000, jmxUrl, jmxAuth, jmxSsl, false, metricsFile.getAbsolutePath(), debug);
                threads.add(thread);
                thread.start();
            });
            TimeUnit.SECONDS.sleep(durationSec);
        }
        catch (Exception e) {
            new Logger(debug).error("Can not initialize JmxMetricsScraper due to: " + e);
            throw e;
        }
        finally {
            threads.forEach(ShutdownableThread::shutdown);
        }
        return metricsFilenames;
    }

    public static boolean checkTieringCompleted(AdminClient adminClient, String topicName, int numPartitions, String jmxAuth, boolean jmxSsl, int jmxPort, boolean debug, String testMetricsOutputDirPath, String metricsOutputFilenamePrefix, int durationSec) throws Exception {
        long startTimeSec = System.currentTimeMillis();
        boolean tieringCompleted = false;
        int collectionDurationSec = 50;
        int backoffSec = 10;
        Logger logger = new Logger(debug);
        do {
            Map tieringCompletedMetricFilenames = logger.call(() -> TierTestUtils.collectJmxMetrics(adminClient, topicName, testMetricsOutputDirPath, metricsOutputFilenamePrefix, jmxAuth, jmxSsl, jmxPort, numPartitions, collectionDurationSec, debug), "collectJmxMetrics");
            JmxMetricsAnalyzer metricsAnalyzer = new JmxMetricsAnalyzer(tieringCompletedMetricFilenames, adminClient, topicName, numPartitions, debug);
            tieringCompleted = logger.call(() -> metricsAnalyzer.tieringCompleted(), "tieringCompleted");
            TimeUnit.SECONDS.sleep(backoffSec);
        } while (!tieringCompleted && (double)(System.currentTimeMillis() - startTimeSec) / 1000.0 <= (double)durationSec);
        return tieringCompleted;
    }
}

