package org.apache.kylin.metrics.lib.impl.kafka;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.spark.job.SparkJobConstants;
import org.apache.kylin.metrics.lib.ActiveReservoirListener;
import org.apache.kylin.metrics.lib.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kylin-metrics-reporter-kafka-4.0.0.jar:org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.class */
public abstract class KafkaActiveReserviorListener implements ActiveReservoirListener {
    public static final long TOPIC_AVAILABLE_TAG = 0;
    protected static final Logger logger = LoggerFactory.getLogger((Class<?>) KafkaActiveReserviorListener.class);
    protected Long maxBlockMs = 1800000L;
    protected int maxRecordForLogNum = KylinConfig.getInstanceFromEnv().printSampleEventRatio();
    protected int maxRecordSkipForLogNum = SparkJobConstants.DEFAULT_SAMPLED_DATA_LIMIT;
    protected ConcurrentHashMap<String, Long> topicsIfAvailable = new ConcurrentHashMap<>();
    private long nRecord = 0;
    private long nRecordSkip = 0;
    private int threshold = Integer.min((int) (this.maxRecordForLogNum * 0.002d), 25);
    private Callback produceCallback = (recordMetadata, exc) -> {
        if (exc != null) {
            logger.warn("Unexpected exception.", (Throwable) exc);
        } else {
            logger.debug("Topic:{} ; partition:{} ; offset:{} .", recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset()));
        }
    };

    protected abstract String decorateTopic(String str);

    protected abstract void tryFetchMetadataFor(String str);

    protected abstract void send(String str, Record record, Callback callback);

    protected void sendWrapper(String str, Record record, Callback callback) {
        try {
            send(str, record, callback);
        } catch (TimeoutException e) {
            setUnAvailable(str);
            throw e;
        }
    }

    @Override // org.apache.kylin.metrics.lib.ActiveReservoirListener
    public boolean onRecordUpdate(List<Record> list) {
        try {
            for (Record record : list) {
                String decorateTopic = decorateTopic(record.getSubject());
                if (this.nRecord <= this.threshold) {
                    logger.debug("Send record {} to topic : {}", record, decorateTopic);
                }
                if (checkAvailable(decorateTopic)) {
                    if (this.nRecord % this.maxRecordForLogNum == 0) {
                        sendWrapper(decorateTopic, record, this.produceCallback);
                    } else {
                        sendWrapper(decorateTopic, record, null);
                    }
                    this.nRecord++;
                } else {
                    if (this.nRecordSkip % this.maxRecordSkipForLogNum == 0) {
                        this.nRecordSkip = 0L;
                        logger.warn("Skip to send record to topic {}", decorateTopic);
                    }
                    this.nRecordSkip++;
                }
            }
            return true;
        } catch (Exception e) {
            logger.error(e.getMessage(), (Throwable) e);
            return false;
        }
    }

    protected boolean checkAvailable(String str) {
        Long l = this.topicsIfAvailable.get(str);
        if (l != null && l.longValue() == 0) {
            return true;
        }
        if (l != null && System.currentTimeMillis() - l.longValue() <= this.maxBlockMs.longValue()) {
            return false;
        }
        try {
            tryFetchMetadataFor(str);
            this.topicsIfAvailable.put(str, 0L);
            return true;
        } catch (TimeoutException e) {
            logger.warn("Fail to fetch metadata for topic " + str, e);
            setUnAvailable(str);
            return false;
        }
    }

    protected void setUnAvailable(String str) {
        logger.debug("Cannot find topic {}", str);
        this.topicsIfAvailable.put(str, Long.valueOf(System.currentTimeMillis()));
    }
}
