package kafka.tier.tools;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.rest.GetTierRecoveryDataUploadJobResultRequest;
import io.confluent.rest.GetTierRecoveryDataUploadJobResultResponse;
import io.confluent.rest.InitiateTierRecoveryDataUploadRequest;
import io.confluent.rest.InitiateTierRecoveryDataUploadResponse;
import io.confluent.rest.ResponseContainer;
import io.confluent.rest.RewindTierTopicConsumerRequest;
import io.confluent.rest.RewindTierTopicConsumerResponse;
import io.confluent.rest.TierMetadataRecoveryHandler;
import io.confluent.rest.TierRecoveryDataUploadResult;
import io.confluent.rest.TierTopicHeadDataLossDetectionRequest;
import io.confluent.rest.TierTopicHeadDataLossDetectionResponse;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import kafka.tier.common.RestServerUtil;
import kafka.tier.tools.commands.GetTierRecoveryDataUploadJobResultCommandRequest;
import kafka.tier.tools.commands.GetTierRecoveryDataUploadJobResultCommandResponse;
import kafka.tier.tools.commands.InitiateTierRecoveryDataUploadCommandRequest;
import kafka.tier.tools.commands.InitiateTierRecoveryDataUploadCommandResponse;
import kafka.tier.tools.commands.RewindTierTopicConsumerCommandRequest;
import kafka.tier.tools.commands.RewindTierTopicConsumerCommandResponse;
import kafka.tier.tools.commands.TierTopicHeadDataLossDetectionCommandRequest;
import kafka.tier.tools.commands.TierTopicHeadDataLossDetectionCommandResponse;
import kafka.tier.topic.TierTopicConsumerRewindPolicy;
import kafka.tier.topic.recovery.TierTopicHeadDataLossReport;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/tier/tools/TierMetadataRecoveryOrchestrator.class */
public class TierMetadataRecoveryOrchestrator {
    private final AdminClient adminClient;
    private final CloseableHttpClient httpClient;
    private final Integer restServerPort;
    private static final String HTTP_PREFIX = "http://";
    private static final Logger LOGGER = LoggerFactory.getLogger(TierMetadataRecoveryOrchestrator.class);
    static final ObjectMapper OBJECT_MAPPER = TierMetadataRecoveryHandler.OBJECT_MAPPER;

    public TierMetadataRecoveryOrchestrator(AdminClient adminClient, Integer num) {
        this(adminClient, RestServerUtil.buildHttpClient(), num);
    }

    public TierMetadataRecoveryOrchestrator(AdminClient adminClient, CloseableHttpClient closeableHttpClient, Integer num) {
        this.adminClient = adminClient;
        this.httpClient = closeableHttpClient;
        this.restServerPort = num;
    }

    private String getBrokerUrl(String str) {
        return HTTP_PREFIX + str + ":" + this.restServerPort;
    }

    private Map<Integer, String> getBrokerToUrlMap() throws ExecutionException, InterruptedException {
        LOGGER.info("Attempting to get list of brokers in the cluster...");
        DescribeClusterResult describeCluster = this.adminClient.describeCluster();
        HashMap hashMap = new HashMap();
        for (Node node : (Collection) describeCluster.nodes().get()) {
            hashMap.put(Integer.valueOf(node.id()), getBrokerUrl(node.host()));
        }
        LOGGER.info("Found {} brokers in the cluster.", Integer.valueOf(hashMap.size()));
        return hashMap;
    }

    private String getBrokerUrlIfBrokerExists(Integer num) throws ExecutionException, InterruptedException {
        Map<Integer, String> brokerToUrlMap = getBrokerToUrlMap();
        if (brokerToUrlMap.containsKey(num)) {
            return brokerToUrlMap.get(num);
        }
        throw new IllegalArgumentException("Broker " + num + " not found in the cluster");
    }

    public Map<Integer, String> getBrokerUrlsIfBrokerExists(Set<Integer> set) throws ExecutionException, InterruptedException {
        Map<Integer, String> brokerToUrlMap = getBrokerToUrlMap();
        HashMap hashMap = new HashMap();
        for (Integer num : set) {
            if (!brokerToUrlMap.containsKey(num)) {
                throw new IllegalArgumentException("Broker " + num + " not found in the cluster");
            }
            hashMap.put(num, brokerToUrlMap.get(num));
        }
        return hashMap;
    }

    private <T> String prettyPrint(T t) throws JsonProcessingException {
        return OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(t);
    }

    public RewindTierTopicConsumerCommandResponse rewindTierTopicConsumerForCluster(Optional<Map<Integer, Map<Long, Optional<Integer>>>> optional, boolean z, TierTopicConsumerRewindPolicy tierTopicConsumerRewindPolicy) throws ExecutionException, InterruptedException, JsonProcessingException {
        LOGGER.info("Rewinding tier topic consumer for all brokers in the cluster.");
        Map<Integer, String> brokerToUrlMap = getBrokerToUrlMap();
        RewindTierTopicConsumerCommandResponse rewindTierTopicConsumerCommandResponse = new RewindTierTopicConsumerCommandResponse();
        for (Map.Entry<Integer, String> entry : brokerToUrlMap.entrySet()) {
            parseRewindTierTopicConsumerResponse(rewindTierTopicConsumerCommandResponse, rewindTierTopicConsumer(new RewindTierTopicConsumerCommandRequest(entry.getValue(), new RewindTierTopicConsumerRequest(optional.orElse(null), z, tierTopicConsumerRewindPolicy.id()))), entry.getKey(), tierTopicConsumerRewindPolicy);
        }
        LOGGER.debug(prettyPrint(rewindTierTopicConsumerCommandResponse));
        return rewindTierTopicConsumerCommandResponse;
    }

    public RewindTierTopicConsumerCommandResponse rewindTierTopicConsumerForBroker(Optional<Map<Integer, Map<Long, Optional<Integer>>>> optional, boolean z, TierTopicConsumerRewindPolicy tierTopicConsumerRewindPolicy, int i) throws JsonProcessingException, ExecutionException, InterruptedException {
        String brokerUrlIfBrokerExists = getBrokerUrlIfBrokerExists(Integer.valueOf(i));
        RewindTierTopicConsumerCommandResponse rewindTierTopicConsumerCommandResponse = new RewindTierTopicConsumerCommandResponse();
        parseRewindTierTopicConsumerResponse(rewindTierTopicConsumerCommandResponse, rewindTierTopicConsumer(new RewindTierTopicConsumerCommandRequest(brokerUrlIfBrokerExists, new RewindTierTopicConsumerRequest(optional.orElse(null), z, tierTopicConsumerRewindPolicy.id()))), Integer.valueOf(i), tierTopicConsumerRewindPolicy);
        LOGGER.debug(prettyPrint(rewindTierTopicConsumerCommandResponse));
        return rewindTierTopicConsumerCommandResponse;
    }

    private String rewindTierTopicConsumer(RewindTierTopicConsumerCommandRequest rewindTierTopicConsumerCommandRequest) throws JsonProcessingException {
        LOGGER.info("Rewinding tier topic consumer for broker {} with request: {}", rewindTierTopicConsumerCommandRequest.brokerUrl(), prettyPrint(rewindTierTopicConsumerCommandRequest.request()));
        return RestServerUtil.sendRequest(rewindTierTopicConsumerCommandRequest, this.httpClient);
    }

    private void parseRewindTierTopicConsumerResponse(RewindTierTopicConsumerCommandResponse rewindTierTopicConsumerCommandResponse, String str, Integer num, TierTopicConsumerRewindPolicy tierTopicConsumerRewindPolicy) throws JsonProcessingException {
        if (str == null) {
            rewindTierTopicConsumerCommandResponse.getFailed().add(new RewindTierTopicConsumerCommandResponse.FailedBrokerDetail(num, "Unable to rewind tier topic consumer for broker " + num, null));
            return;
        }
        RewindTierTopicConsumerResponse rewindTierTopicConsumerResponse = (RewindTierTopicConsumerResponse) ((ResponseContainer) OBJECT_MAPPER.readValue(str, new TypeReference<ResponseContainer<RewindTierTopicConsumerResponse>>() { // from class: kafka.tier.tools.TierMetadataRecoveryOrchestrator.1
        })).data.attributes;
        if ((!rewindTierTopicConsumerResponse.skippedPartitions().isEmpty()) && tierTopicConsumerRewindPolicy == TierTopicConsumerRewindPolicy.SKIP_MISSING_PARTITIONS) {
            rewindTierTopicConsumerCommandResponse.getFailed().add(new RewindTierTopicConsumerCommandResponse.FailedBrokerDetail(num, "Failed to fully rewind tier topic consumer for broker " + num, rewindTierTopicConsumerResponse.skippedPartitions()));
        } else {
            rewindTierTopicConsumerCommandResponse.getSuccess().add(num);
        }
    }

    public InitiateTierRecoveryDataUploadCommandResponse initiateTierRecoveryDataUploadForCluster(Set<TopicIdPartition> set, String str, int i) throws ExecutionException, InterruptedException, JsonProcessingException {
        LOGGER.info("Initiating tier recovery data upload for all brokers in the cluster.");
        Map<Integer, String> brokerToUrlMap = getBrokerToUrlMap();
        InitiateTierRecoveryDataUploadCommandResponse initiateTierRecoveryDataUploadCommandResponse = new InitiateTierRecoveryDataUploadCommandResponse();
        for (Map.Entry<Integer, String> entry : brokerToUrlMap.entrySet()) {
            parseInitiateTierRecoveryDataUploadResponse(initiateTierRecoveryDataUploadCommandResponse, initiateTierRecoveryDataUpload(new InitiateTierRecoveryDataUploadCommandRequest(entry.getValue(), new InitiateTierRecoveryDataUploadRequest(set, str, i))), entry.getKey());
        }
        LOGGER.debug(prettyPrint(initiateTierRecoveryDataUploadCommandResponse));
        return initiateTierRecoveryDataUploadCommandResponse;
    }

    public InitiateTierRecoveryDataUploadCommandResponse initiateTierRecoveryDataUploadForBroker(Set<TopicIdPartition> set, String str, int i, int i2) throws JsonProcessingException, ExecutionException, InterruptedException {
        String brokerUrlIfBrokerExists = getBrokerUrlIfBrokerExists(Integer.valueOf(i2));
        InitiateTierRecoveryDataUploadCommandResponse initiateTierRecoveryDataUploadCommandResponse = new InitiateTierRecoveryDataUploadCommandResponse();
        parseInitiateTierRecoveryDataUploadResponse(initiateTierRecoveryDataUploadCommandResponse, initiateTierRecoveryDataUpload(new InitiateTierRecoveryDataUploadCommandRequest(brokerUrlIfBrokerExists, new InitiateTierRecoveryDataUploadRequest(set, str, i))), Integer.valueOf(i2));
        LOGGER.debug(prettyPrint(initiateTierRecoveryDataUploadCommandResponse));
        return initiateTierRecoveryDataUploadCommandResponse;
    }

    private String initiateTierRecoveryDataUpload(InitiateTierRecoveryDataUploadCommandRequest initiateTierRecoveryDataUploadCommandRequest) throws JsonProcessingException {
        LOGGER.info("Initiating upload of tier recovery data for broker {} with request: {}", initiateTierRecoveryDataUploadCommandRequest.brokerUrl(), prettyPrint(initiateTierRecoveryDataUploadCommandRequest.request()));
        return RestServerUtil.sendRequest(initiateTierRecoveryDataUploadCommandRequest, this.httpClient);
    }

    private void parseInitiateTierRecoveryDataUploadResponse(InitiateTierRecoveryDataUploadCommandResponse initiateTierRecoveryDataUploadCommandResponse, String str, Integer num) throws JsonProcessingException {
        if (str == null) {
            initiateTierRecoveryDataUploadCommandResponse.getFailed().add(new InitiateTierRecoveryDataUploadCommandResponse.FailedBrokerDetail(num, "Unable to initiate tier recovery data upload for broker " + num));
        } else {
            initiateTierRecoveryDataUploadCommandResponse.getSuccess().add(new InitiateTierRecoveryDataUploadCommandResponse.SuccessBrokerDetail(num, ((InitiateTierRecoveryDataUploadResponse) ((ResponseContainer) OBJECT_MAPPER.readValue(str, new TypeReference<ResponseContainer<InitiateTierRecoveryDataUploadResponse>>() { // from class: kafka.tier.tools.TierMetadataRecoveryOrchestrator.2
            })).data.attributes).jobId()));
        }
    }

    public GetTierRecoveryDataUploadJobResultCommandResponse getTierRecoveryDataUploadJobResultForBrokers(Map<Integer, UUID> map, Map<Integer, String> map2) throws JsonProcessingException {
        LOGGER.info("Get tier recovery data upload result for provided brokers.");
        GetTierRecoveryDataUploadJobResultCommandResponse getTierRecoveryDataUploadJobResultCommandResponse = new GetTierRecoveryDataUploadJobResultCommandResponse();
        for (Map.Entry<Integer, UUID> entry : map.entrySet()) {
            parseGetTierRecoveryDataUploadJobResponse(getTierRecoveryDataUploadJobResultCommandResponse, getTierRecoveryDataUploadJobResult(new GetTierRecoveryDataUploadJobResultCommandRequest(map2.get(entry.getKey()), new GetTierRecoveryDataUploadJobResultRequest(entry.getValue()))), entry.getKey(), entry.getValue());
        }
        LOGGER.debug(prettyPrint(getTierRecoveryDataUploadJobResultCommandResponse));
        return getTierRecoveryDataUploadJobResultCommandResponse;
    }

    public GetTierRecoveryDataUploadJobResultCommandResponse getTierRecoveryDataUploadJobResultForBroker(int i, UUID uuid) throws JsonProcessingException, ExecutionException, InterruptedException {
        LOGGER.info("Get tier recovery data upload result for broker {}.", Integer.valueOf(i));
        String brokerUrlIfBrokerExists = getBrokerUrlIfBrokerExists(Integer.valueOf(i));
        GetTierRecoveryDataUploadJobResultCommandResponse getTierRecoveryDataUploadJobResultCommandResponse = new GetTierRecoveryDataUploadJobResultCommandResponse();
        parseGetTierRecoveryDataUploadJobResponse(getTierRecoveryDataUploadJobResultCommandResponse, getTierRecoveryDataUploadJobResult(new GetTierRecoveryDataUploadJobResultCommandRequest(brokerUrlIfBrokerExists, new GetTierRecoveryDataUploadJobResultRequest(uuid))), Integer.valueOf(i), uuid);
        LOGGER.debug(prettyPrint(getTierRecoveryDataUploadJobResultCommandResponse));
        return getTierRecoveryDataUploadJobResultCommandResponse;
    }

    private String getTierRecoveryDataUploadJobResult(GetTierRecoveryDataUploadJobResultCommandRequest getTierRecoveryDataUploadJobResultCommandRequest) throws JsonProcessingException {
        LOGGER.info("Get upload tier recovery data result for broker {} with request: {}", getTierRecoveryDataUploadJobResultCommandRequest.brokerUrl(), prettyPrint(getTierRecoveryDataUploadJobResultCommandRequest.request()));
        return RestServerUtil.sendRequest(getTierRecoveryDataUploadJobResultCommandRequest, this.httpClient);
    }

    private void parseGetTierRecoveryDataUploadJobResponse(GetTierRecoveryDataUploadJobResultCommandResponse getTierRecoveryDataUploadJobResultCommandResponse, String str, Integer num, UUID uuid) throws JsonProcessingException {
        if (str == null) {
            getTierRecoveryDataUploadJobResultCommandResponse.getFailed().add(new GetTierRecoveryDataUploadJobResultCommandResponse.BrokerDetail(num, uuid, null, Optional.of("Unable to get tier recovery data upload job result for broker " + num)));
            return;
        }
        GetTierRecoveryDataUploadJobResultResponse getTierRecoveryDataUploadJobResultResponse = (GetTierRecoveryDataUploadJobResultResponse) ((ResponseContainer) OBJECT_MAPPER.readValue(str, new TypeReference<ResponseContainer<GetTierRecoveryDataUploadJobResultResponse>>() { // from class: kafka.tier.tools.TierMetadataRecoveryOrchestrator.3
        })).data.attributes;
        if (isCompletedUpload(getTierRecoveryDataUploadJobResultResponse.result())) {
            getTierRecoveryDataUploadJobResultCommandResponse.getCompleted().add(new GetTierRecoveryDataUploadJobResultCommandResponse.BrokerDetail(num, uuid, getTierRecoveryDataUploadJobResultResponse.result(), Optional.empty()));
        } else if (isInProgressUpload(getTierRecoveryDataUploadJobResultResponse.result())) {
            getTierRecoveryDataUploadJobResultCommandResponse.getInProgress().add(new GetTierRecoveryDataUploadJobResultCommandResponse.BrokerDetail(num, uuid, getTierRecoveryDataUploadJobResultResponse.result(), Optional.empty()));
        } else {
            getTierRecoveryDataUploadJobResultCommandResponse.getFailed().add(new GetTierRecoveryDataUploadJobResultCommandResponse.BrokerDetail(num, uuid, getTierRecoveryDataUploadJobResultResponse.result(), Optional.of(String.format("Failed to parse the tier recovery data upload job result for broker %d due to unexpected job status: %s. Job Result: %s", num, getTierRecoveryDataUploadJobResultResponse.result().status(), getTierRecoveryDataUploadJobResultResponse))));
        }
    }

    private boolean isCompletedUpload(io.confluent.rest.TierRecoveryDataUploadResult tierRecoveryDataUploadResult) {
        return tierRecoveryDataUploadResult != null && tierRecoveryDataUploadResult.status() == TierRecoveryDataUploadResult.TierRecoveryDataUploadJobStatus.COMPLETED;
    }

    private boolean isInProgressUpload(io.confluent.rest.TierRecoveryDataUploadResult tierRecoveryDataUploadResult) {
        return tierRecoveryDataUploadResult != null && new HashSet(Arrays.asList(TierRecoveryDataUploadResult.TierRecoveryDataUploadJobStatus.RUNNING, TierRecoveryDataUploadResult.TierRecoveryDataUploadJobStatus.DATA_UPLOAD_COMPLETED)).contains(tierRecoveryDataUploadResult.status());
    }

    public TierTopicHeadDataLossDetectionCommandResponse detectDataLossInTierTopicForCluster(String str, Set<TopicPartition> set) throws ExecutionException, InterruptedException, JsonProcessingException {
        LOGGER.info("Starting on-demand data loss validation for all brokers in the cluster on tier topic partitions: {}.", set);
        Map<Integer, String> brokerToUrlMap = getBrokerToUrlMap();
        TierTopicHeadDataLossDetectionCommandResponse tierTopicHeadDataLossDetectionCommandResponse = new TierTopicHeadDataLossDetectionCommandResponse();
        for (Map.Entry<Integer, String> entry : brokerToUrlMap.entrySet()) {
            parseDataLossValidationResponse(tierTopicHeadDataLossDetectionCommandResponse, detectDataLossInTierTopic(new TierTopicHeadDataLossDetectionCommandRequest(entry.getValue(), new TierTopicHeadDataLossDetectionRequest(str, set))), entry.getKey());
        }
        System.out.println(prettyPrint(tierTopicHeadDataLossDetectionCommandResponse));
        return tierTopicHeadDataLossDetectionCommandResponse;
    }

    public TierTopicHeadDataLossDetectionCommandResponse detectDataLossInTierTopicForBroker(int i, String str, Set<TopicPartition> set) throws JsonProcessingException, ExecutionException, InterruptedException {
        String brokerUrlIfBrokerExists = getBrokerUrlIfBrokerExists(Integer.valueOf(i));
        TierTopicHeadDataLossDetectionCommandResponse tierTopicHeadDataLossDetectionCommandResponse = new TierTopicHeadDataLossDetectionCommandResponse();
        parseDataLossValidationResponse(tierTopicHeadDataLossDetectionCommandResponse, detectDataLossInTierTopic(new TierTopicHeadDataLossDetectionCommandRequest(brokerUrlIfBrokerExists, new TierTopicHeadDataLossDetectionRequest(str, set))), Integer.valueOf(i));
        LOGGER.debug(prettyPrint(tierTopicHeadDataLossDetectionCommandResponse));
        return tierTopicHeadDataLossDetectionCommandResponse;
    }

    private String detectDataLossInTierTopic(TierTopicHeadDataLossDetectionCommandRequest tierTopicHeadDataLossDetectionCommandRequest) throws JsonProcessingException {
        LOGGER.info("Detecting data loss for broker {} with request: {}", tierTopicHeadDataLossDetectionCommandRequest.brokerUrl(), prettyPrint(tierTopicHeadDataLossDetectionCommandRequest.request()));
        return RestServerUtil.sendRequest(tierTopicHeadDataLossDetectionCommandRequest, this.httpClient);
    }

    private void parseDataLossValidationResponse(TierTopicHeadDataLossDetectionCommandResponse tierTopicHeadDataLossDetectionCommandResponse, String str, Integer num) throws JsonProcessingException {
        if (str == null) {
            tierTopicHeadDataLossDetectionCommandResponse.getFailed().add(new TierTopicHeadDataLossDetectionCommandResponse.FailedBrokerDetail(num, TierTopicHeadDataLossReport.CompletionStatus.FAILURE, Collections.singletonList("Unable to start on-demand data loss detection for broker " + num), null));
            return;
        }
        TierTopicHeadDataLossDetectionResponse tierTopicHeadDataLossDetectionResponse = (TierTopicHeadDataLossDetectionResponse) ((ResponseContainer) OBJECT_MAPPER.readValue(str, new TypeReference<ResponseContainer<TierTopicHeadDataLossDetectionResponse>>() { // from class: kafka.tier.tools.TierMetadataRecoveryOrchestrator.4
        })).data.attributes;
        if (tierTopicHeadDataLossDetectionResponse.completionStatus() == TierTopicHeadDataLossDetectionResponse.CompletionStatus.SUCCESS) {
            tierTopicHeadDataLossDetectionCommandResponse.getSuccess().add(new TierTopicHeadDataLossDetectionCommandResponse.SuccessBrokerDetail(num, tierTopicHeadDataLossDetectionResponse.dataLossReportPath()));
        } else {
            tierTopicHeadDataLossDetectionCommandResponse.getFailed().add(new TierTopicHeadDataLossDetectionCommandResponse.FailedBrokerDetail(num, TierTopicHeadDataLossReport.CompletionStatus.FAILURE, tierTopicHeadDataLossDetectionResponse.errorMessages(), tierTopicHeadDataLossDetectionResponse.dataLossReportPath()));
        }
    }
}
