package gobblin.kafka.schemareg;

import com.google.common.base.Preconditions;
import gobblin.configuration.ConfigurationKeys;
import gobblin.kafka.serialize.MD5Digest;
import gobblin.util.AvroUtils;
import java.io.IOException;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.commons.httpclient.Header;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpException;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/gobblin-kafka-common-0.11.0.jar:gobblin/kafka/schemareg/LiKafkaSchemaRegistry.class */
public class LiKafkaSchemaRegistry implements KafkaSchemaRegistry<MD5Digest, Schema> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) LiKafkaSchemaRegistry.class);
    private static final String GET_RESOURCE_BY_ID = "/id=";
    private static final String GET_RESOURCE_BY_TYPE = "/latest_with_type=";
    private static final String SCHEMA_ID_HEADER_NAME = "Location";
    private static final String SCHEMA_ID_HEADER_PREFIX = "/id=";
    private final GenericObjectPool<HttpClient> httpClientPool;
    private final String url;

    public LiKafkaSchemaRegistry(Properties properties) {
        Preconditions.checkArgument(properties.containsKey(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_URL), String.format("Property %s not provided.", KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_URL));
        this.url = properties.getProperty(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_URL);
        int parseInt = Integer.parseInt(properties.getProperty(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS, "30"));
        LOG.info("Create HttpClient pool with size " + parseInt);
        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
        genericObjectPoolConfig.setMaxTotal(parseInt);
        genericObjectPoolConfig.setMaxIdle(parseInt);
        this.httpClientPool = new GenericObjectPool<>(new HttpClientFactory(), genericObjectPoolConfig);
    }

    @Override // gobblin.kafka.schemareg.KafkaSchemaRegistry
    public Schema getById(MD5Digest mD5Digest) throws IOException, SchemaRegistryException {
        return fetchSchemaByKey(mD5Digest);
    }

    @Override // gobblin.kafka.schemareg.KafkaSchemaRegistry
    public boolean hasInternalCache() {
        return false;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // gobblin.kafka.schemareg.KafkaSchemaRegistry
    public Schema getLatestSchema(String str) throws SchemaRegistryException {
        String str2 = this.url + GET_RESOURCE_BY_TYPE + str;
        LOG.debug("Fetching from URL : " + str2);
        GetMethod getMethod = new GetMethod(str2);
        HttpClient borrowClient = borrowClient();
        try {
            try {
                int executeMethod = borrowClient.executeMethod(getMethod);
                String responseBodyAsString = getMethod.getResponseBodyAsString();
                getMethod.releaseConnection();
                this.httpClientPool.returnObject(borrowClient);
                if (executeMethod != 200) {
                    throw new SchemaRegistryException(String.format("Latest schema for topic %s cannot be retrieved. Status code = %d", str, Integer.valueOf(executeMethod)));
                }
                try {
                    return new Schema.Parser().parse(responseBodyAsString);
                } catch (Throwable th) {
                    throw new SchemaRegistryException(String.format("Latest schema for topic %s cannot be retrieved", str), th);
                }
            } catch (HttpException e) {
                throw new RuntimeException(e);
            } catch (IOException e2) {
                throw new RuntimeException(e2);
            }
        } catch (Throwable th2) {
            getMethod.releaseConnection();
            this.httpClientPool.returnObject(borrowClient);
            throw th2;
        }
    }

    private HttpClient borrowClient() throws SchemaRegistryException {
        try {
            return this.httpClientPool.borrowObject();
        } catch (Exception e) {
            throw new SchemaRegistryException("Unable to borrow " + HttpClient.class.getSimpleName());
        }
    }

    @Override // gobblin.kafka.schemareg.KafkaSchemaRegistry
    public MD5Digest register(String str, Schema schema) throws SchemaRegistryException {
        return register(AvroUtils.switchName(schema, str));
    }

    public synchronized MD5Digest register(Schema schema) throws SchemaRegistryException {
        LOG.info("Registering schema " + schema.toString());
        PostMethod postMethod = new PostMethod(this.url);
        postMethod.addParameter("schema", schema.toString());
        HttpClient borrowClient = borrowClient();
        try {
            try {
                LOG.debug("Loading: " + postMethod.getURI());
                int executeMethod = borrowClient.executeMethod(postMethod);
                if (executeMethod != 201) {
                    throw new SchemaRegistryException("Error occurred while trying to register schema: " + executeMethod);
                }
                String responseBodyAsString = postMethod.getResponseBodyAsString();
                if (responseBodyAsString != null) {
                    LOG.info("Received response " + responseBodyAsString);
                }
                Header[] responseHeaders = postMethod.getResponseHeaders("Location");
                if (responseHeaders.length != 1) {
                    throw new SchemaRegistryException("Error reading schema id returned by registerSchema call: headers.length = " + responseHeaders.length);
                }
                if (!responseHeaders[0].getValue().startsWith("/id=")) {
                    throw new SchemaRegistryException("Error parsing schema id returned by registerSchema call: header = " + responseHeaders[0].getValue());
                }
                LOG.info("Registered schema successfully");
                MD5Digest fromString = MD5Digest.fromString(responseHeaders[0].getValue().substring("/id=".length()));
                postMethod.releaseConnection();
                this.httpClientPool.returnObject(borrowClient);
                return fromString;
            } catch (Throwable th) {
                throw new SchemaRegistryException(th);
            }
        } catch (Throwable th2) {
            postMethod.releaseConnection();
            this.httpClientPool.returnObject(borrowClient);
            throw th2;
        }
    }

    protected Schema fetchSchemaByKey(MD5Digest mD5Digest) throws SchemaRegistryException {
        GetMethod getMethod = new GetMethod(this.url + "/id=" + mD5Digest.asString());
        HttpClient borrowClient = borrowClient();
        try {
            try {
                int executeMethod = borrowClient.executeMethod(getMethod);
                String responseBodyAsString = getMethod.getResponseBodyAsString();
                getMethod.releaseConnection();
                this.httpClientPool.returnObject(borrowClient);
                if (executeMethod != 200) {
                    throw new SchemaRegistryException(String.format("Schema with key %s cannot be retrieved, statusCode = %d", mD5Digest, Integer.valueOf(executeMethod)));
                }
                try {
                    return new Schema.Parser().parse(responseBodyAsString);
                } catch (Throwable th) {
                    throw new SchemaRegistryException(String.format("Schema with ID = %s cannot be parsed", mD5Digest), th);
                }
            } catch (IOException e) {
                throw new SchemaRegistryException(e);
            }
        } catch (Throwable th2) {
            getMethod.releaseConnection();
            this.httpClientPool.returnObject(borrowClient);
            throw th2;
        }
    }
}
