package org.apache.gobblin.compaction.audit;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.State;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.class */
public class KafkaAuditCountHttpClient implements AuditCountClient {
    public static final String KAFKA_AUDIT_HTTP = "kafka.audit.http";
    public static final String CONNECTION_MAX_TOTAL = "kafka.audit.httpmax.total";
    public static final int DEFAULT_CONNECTION_MAX_TOTAL = 10;
    public static final String MAX_PER_ROUTE = "kafka.audit.httpmax.per.route";
    public static final int DEFAULT_MAX_PER_ROUTE = 10;
    public static final String KAFKA_AUDIT_REST_BASE_URL = "kafka.audit.rest.base.url";
    public static final String KAFKA_AUDIT_REST_MAX_TRIES = "kafka.audit.rest.max.tries";
    public static final String KAFKA_AUDIT_REST_START_QUERYSTRING_KEY = "kafka.audit.rest.querystring.start";
    public static final String KAFKA_AUDIT_REST_END_QUERYSTRING_KEY = "kafka.audit.rest.querystring.end";
    public static final String KAFKA_AUDIT_REST_START_QUERYSTRING_DEFAULT = "begin";
    public static final String KAFKA_AUDIT_REST_END_QUERYSTRING_DEFAULT = "end";
    private PoolingHttpClientConnectionManager cm;
    private CloseableHttpClient httpClient;
    private final String baseUrl;
    private final String startQueryString;
    private final String endQueryString;
    private final int maxNumTries;
    private static final Logger log = LoggerFactory.getLogger(KafkaAuditCountHttpClient.class);
    private static final JsonParser PARSER = new JsonParser();

    public KafkaAuditCountHttpClient(State state) {
        int propAsInt = state.getPropAsInt(CONNECTION_MAX_TOTAL, 10);
        int propAsInt2 = state.getPropAsInt(MAX_PER_ROUTE, 10);
        this.cm = new PoolingHttpClientConnectionManager();
        this.cm.setMaxTotal(propAsInt);
        this.cm.setDefaultMaxPerRoute(propAsInt2);
        this.httpClient = HttpClients.custom().setConnectionManager(this.cm).build();
        this.baseUrl = state.getProp(KAFKA_AUDIT_REST_BASE_URL);
        this.maxNumTries = state.getPropAsInt(KAFKA_AUDIT_REST_MAX_TRIES, 5);
        this.startQueryString = state.getProp(KAFKA_AUDIT_REST_START_QUERYSTRING_KEY, KAFKA_AUDIT_REST_START_QUERYSTRING_DEFAULT);
        this.endQueryString = state.getProp(KAFKA_AUDIT_REST_END_QUERYSTRING_KEY, KAFKA_AUDIT_REST_END_QUERYSTRING_DEFAULT);
    }

    @Override // org.apache.gobblin.compaction.audit.AuditCountClient
    public Map<String, Long> fetch(String str, long j, long j2) throws IOException {
        String str2 = (this.baseUrl.endsWith("/") ? this.baseUrl : this.baseUrl + "/") + StringUtils.replaceChars(str, '/', '.') + "?" + this.startQueryString + "=" + j + "&" + this.endQueryString + "=" + j2;
        log.info("Full URL is " + str2);
        return parseResponse(str2, getHttpResponse(str2), str);
    }

    @VisibleForTesting
    public static Map<String, Long> parseResponse(String str, String str2, String str3) throws IOException {
        HashMap newHashMap = Maps.newHashMap();
        try {
            for (Map.Entry entry : PARSER.parse(str2).getAsJsonObject().getAsJsonObject("result").entrySet()) {
                newHashMap.put((String) entry.getKey(), Long.valueOf(Long.parseLong(((JsonElement) entry.getValue()).getAsString())));
            }
            return newHashMap;
        } catch (Exception e) {
            throw new IOException(String.format("Unable to parse JSON response: %s for request url: %s ", str2, str), e);
        }
    }

    private String getHttpResponse(String str) throws IOException {
        Throwable th;
        String entityUtils;
        HttpGet httpGet = new HttpGet(str);
        int i = 0;
        while (true) {
            try {
                CloseableHttpResponse execute = this.httpClient.execute(httpGet);
                Throwable th2 = null;
                try {
                    try {
                        int statusCode = execute.getStatusLine().getStatusCode();
                        if (statusCode >= 200 && statusCode < 300) {
                            entityUtils = EntityUtils.toString(execute.getEntity());
                            if (execute != null) {
                                if (0 == 0) {
                                    execute.close();
                                    break;
                                }
                                try {
                                    execute.close();
                                    break;
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                break;
                            }
                        } else {
                            throw new IOException(String.format("status code: %d, reason: %s", Integer.valueOf(statusCode), execute.getStatusLine().getReasonPhrase()));
                            break;
                        }
                    } catch (Throwable th4) {
                        th2 = th4;
                        throw th4;
                        break;
                    }
                } finally {
                    if (execute == null) {
                        break;
                    }
                    if (th == null) {
                        break;
                    }
                    try {
                        break;
                    } catch (Throwable th5) {
                    }
                }
            } catch (IOException e) {
                String str2 = "Unable to get or parse HTTP response for " + str;
                if (i >= this.maxNumTries) {
                    throw new IOException(str2, e);
                }
                long j = (i + 1) * 2;
                log.error(str2 + ", will retry in " + j + " sec", e);
                try {
                    Thread.sleep(TimeUnit.SECONDS.toMillis(j));
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                }
                i++;
            }
        }
        return entityUtils;
    }
}
