package gobblin.metrics.kafka;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
import java.lang.reflect.InvocationTargetException;
import java.util.Properties;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/gobblin-kafka-common-0.11.0.jar:gobblin/metrics/kafka/KafkaSchemaRegistry.class */
public abstract class KafkaSchemaRegistry<K, S> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaSchemaRegistry.class);
    public static final String KAFKA_SCHEMA_REGISTRY_CLASS = "kafka.schema.registry.class";
    public static final String KAFKA_SCHEMA_REGISTRY_URL = "kafka.schema.registry.url";
    public static final int GET_SCHEMA_BY_ID_MAX_TIRES = 3;
    public static final int GET_SCHEMA_BY_ID_MIN_INTERVAL_SECONDS = 1;
    public static final String KAFKA_SCHEMA_REGISTRY_MAX_CACHE_SIZE = "kafka.schema.registry.max.cache.size";
    public static final String DEFAULT_KAFKA_SCHEMA_REGISTRY_MAX_CACHE_SIZE = "1000";
    public static final String KAFKA_SCHEMA_REGISTRY_CACHE_EXPIRE_AFTER_WRITE_MIN = "kafka.schema.registry.cache.expire.after.write.min";
    public static final String DEFAULT_KAFKA_SCHEMA_REGISTRY_CACHE_EXPIRE_AFTER_WRITE_MIN = "10";
    protected final Properties props;
    protected final LoadingCache<K, S> cachedSchemasByKeys;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/gobblin-kafka-common-0.11.0.jar:gobblin/metrics/kafka/KafkaSchemaRegistry$KafkaSchemaCacheLoader.class */
    public class KafkaSchemaCacheLoader extends CacheLoader<K, S> {
        private final ConcurrentMap<K, KafkaSchemaRegistry<K, S>.KafkaSchemaCacheLoader.FailedFetchHistory> failedFetchHistories;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:WEB-INF/lib/gobblin-kafka-common-0.11.0.jar:gobblin/metrics/kafka/KafkaSchemaRegistry$KafkaSchemaCacheLoader$FailedFetchHistory.class */
        public class FailedFetchHistory {
            private final AtomicInteger numOfAttempts;
            private long previousAttemptTime;

            private FailedFetchHistory(long j) {
                this.numOfAttempts = new AtomicInteger();
                this.previousAttemptTime = j;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public int getNumOfAttempts() {
                return this.numOfAttempts.get();
            }

            /* JADX INFO: Access modifiers changed from: private */
            public long getPreviousAttemptTime() {
                return this.previousAttemptTime;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public void setPreviousAttemptTime(long j) {
                this.previousAttemptTime = j;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public void incrementNumOfAttempts() {
                this.numOfAttempts.incrementAndGet();
            }
        }

        private KafkaSchemaCacheLoader() {
            this.failedFetchHistories = Maps.newConcurrentMap();
        }

        @Override // com.google.common.cache.CacheLoader
        public S load(K k) throws Exception {
            if (!shouldFetchFromSchemaRegistry(k)) {
                throw new SchemaRegistryException(String.format("Schema with key %s cannot be retrieved", k));
            }
            try {
                return (S) KafkaSchemaRegistry.this.fetchSchemaByKey(k);
            } catch (SchemaRegistryException e) {
                addFetchToFailureHistory(k);
                throw e;
            }
        }

        private void addFetchToFailureHistory(K k) {
            this.failedFetchHistories.putIfAbsent(k, new FailedFetchHistory(System.nanoTime()));
            this.failedFetchHistories.get(k).incrementNumOfAttempts();
            this.failedFetchHistories.get(k).setPreviousAttemptTime(System.nanoTime());
        }

        private boolean shouldFetchFromSchemaRegistry(K k) {
            if (!this.failedFetchHistories.containsKey(k)) {
                return true;
            }
            KafkaSchemaRegistry<K, S>.KafkaSchemaCacheLoader.FailedFetchHistory failedFetchHistory = this.failedFetchHistories.get(k);
            return (failedFetchHistory.getNumOfAttempts() < 3) && (((System.nanoTime() - failedFetchHistory.getPreviousAttemptTime()) > TimeUnit.SECONDS.toNanos(1L) ? 1 : ((System.nanoTime() - failedFetchHistory.getPreviousAttemptTime()) == TimeUnit.SECONDS.toNanos(1L) ? 0 : -1)) >= 0);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaSchemaRegistry(Properties properties) {
        this.props = properties;
        this.cachedSchemasByKeys = (LoadingCache<K, S>) CacheBuilder.newBuilder().maximumSize(Integer.parseInt(properties.getProperty(KAFKA_SCHEMA_REGISTRY_MAX_CACHE_SIZE, "1000"))).expireAfterWrite(Integer.parseInt(properties.getProperty(KAFKA_SCHEMA_REGISTRY_CACHE_EXPIRE_AFTER_WRITE_MIN, "10")), TimeUnit.MINUTES).build(new KafkaSchemaCacheLoader());
    }

    public static <K, S> KafkaSchemaRegistry<K, S> get(Properties properties) {
        Preconditions.checkArgument(properties.containsKey(KAFKA_SCHEMA_REGISTRY_CLASS), "Missing required property kafka.schema.registry.class");
        try {
            return (KafkaSchemaRegistry) ConstructorUtils.invokeConstructor(Class.forName(properties.getProperty(KAFKA_SCHEMA_REGISTRY_CLASS)), properties);
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            log.error("Failed to instantiate " + KafkaSchemaRegistry.class, e);
            throw Throwables.propagate(e);
        }
    }

    public S getSchemaByKey(K k) throws SchemaRegistryException {
        try {
            return this.cachedSchemasByKeys.get(k);
        } catch (ExecutionException e) {
            throw new SchemaRegistryException(String.format("Schema with key %s cannot be retrieved", k), e);
        }
    }

    protected abstract S fetchSchemaByKey(K k) throws SchemaRegistryException;

    public abstract S getLatestSchemaByTopic(String str) throws SchemaRegistryException;

    public abstract K register(S s) throws SchemaRegistryException;

    public abstract K register(S s, String str) throws SchemaRegistryException;
}
