package org.apache.samza.system.hdfs;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.hdfs.reader.HdfsReaderFactory;
import org.apache.samza.system.hdfs.reader.MultiFileHdfsReader;
import org.apache.samza.util.BlockingEnvelopeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/system/hdfs/HdfsSystemConsumer.class */
public class HdfsSystemConsumer extends BlockingEnvelopeMap {
    private static final Logger LOG = LoggerFactory.getLogger(HdfsSystemConsumer.class);
    private static final String METRICS_GROUP_NAME = HdfsSystemConsumer.class.getName();
    private final HdfsReaderFactory.ReaderType readerType;
    private final String stagingDirectory;
    private final int bufferCapacity;
    private final int numMaxRetires;
    private ExecutorService executorService;
    private LoadingCache<String, Map<Partition, List<String>>> cachedPartitionDescriptorMap;
    private Map<SystemStreamPartition, MultiFileHdfsReader> readers;
    private Map<SystemStreamPartition, Future> readerRunnableStatus;
    private volatile boolean isShutdown;
    private final HdfsSystemConsumerMetrics consumerMetrics;
    private final HdfsConfig hdfsConfig;

    /* loaded from: input_file:org/apache/samza/system/hdfs/HdfsSystemConsumer$HdfsSystemConsumerMetrics.class */
    public static class HdfsSystemConsumerMetrics {
        private final MetricsRegistry metricsRegistry;
        private final Map<SystemStreamPartition, Counter> numEventsCounterMap = new ConcurrentHashMap();
        private final Counter numTotalEventsCounter;

        public HdfsSystemConsumerMetrics(MetricsRegistry metricsRegistry) {
            this.metricsRegistry = metricsRegistry;
            this.numTotalEventsCounter = metricsRegistry.newCounter(HdfsSystemConsumer.METRICS_GROUP_NAME, "num-total-events");
        }

        public void registerSystemStreamPartition(SystemStreamPartition systemStreamPartition) {
            this.numEventsCounterMap.putIfAbsent(systemStreamPartition, this.metricsRegistry.newCounter(HdfsSystemConsumer.METRICS_GROUP_NAME, "num-events-" + systemStreamPartition));
        }

        public void incNumEvents(SystemStreamPartition systemStreamPartition) {
            if (!this.numEventsCounterMap.containsKey(systemStreamPartition)) {
                registerSystemStreamPartition(systemStreamPartition);
            }
            this.numEventsCounterMap.get(systemStreamPartition).inc();
        }

        public void incTotalNumEvents() {
            this.numTotalEventsCounter.inc();
        }

        public MetricsRegistry getMetricsRegistry() {
            return this.metricsRegistry;
        }
    }

    /* loaded from: input_file:org/apache/samza/system/hdfs/HdfsSystemConsumer$ReaderRunnable.class */
    private class ReaderRunnable implements Runnable {
        public MultiFileHdfsReader reader;

        public ReaderRunnable(MultiFileHdfsReader multiFileHdfsReader) {
            this.reader = multiFileHdfsReader;
        }

        @Override // java.lang.Runnable
        public void run() {
            HdfsSystemConsumer.this.doPoll(this.reader);
        }
    }

    public HdfsSystemConsumer(String str, Config config, HdfsSystemConsumerMetrics hdfsSystemConsumerMetrics) {
        super(hdfsSystemConsumerMetrics.getMetricsRegistry());
        this.hdfsConfig = new HdfsConfig(config);
        this.readerType = HdfsReaderFactory.getType(this.hdfsConfig.getFileReaderType(str));
        this.stagingDirectory = this.hdfsConfig.getStagingDirectory(str);
        this.bufferCapacity = this.hdfsConfig.getConsumerBufferCapacity(str);
        this.numMaxRetires = this.hdfsConfig.getConsumerNumMaxRetries(str);
        this.readers = new ConcurrentHashMap();
        this.readerRunnableStatus = new ConcurrentHashMap();
        this.isShutdown = false;
        this.consumerMetrics = hdfsSystemConsumerMetrics;
        this.cachedPartitionDescriptorMap = CacheBuilder.newBuilder().build(new CacheLoader<String, Map<Partition, List<String>>>() { // from class: org.apache.samza.system.hdfs.HdfsSystemConsumer.1
            public Map<Partition, List<String>> load(String str2) throws Exception {
                Validate.notEmpty(str2);
                if (StringUtils.isBlank(HdfsSystemConsumer.this.stagingDirectory)) {
                    throw new SamzaException("Staging directory can't be empty. Is this not a yarn job (currently hdfs system consumer only works in the same yarn environment on which hdfs is running)? Is STAGING_DIRECTORY (" + HdfsConfig.STAGING_DIRECTORY() + ") not set (see HdfsConfig.scala)?");
                }
                return HdfsSystemAdmin.obtainPartitionDescriptorMap(HdfsSystemConsumer.this.stagingDirectory, str2);
            }
        });
    }

    public void start() {
        LOG.info(String.format("HdfsSystemConsumer started with %d readers", Integer.valueOf(this.readers.size())));
        this.executorService = Executors.newCachedThreadPool();
        this.readers.entrySet().forEach(entry -> {
        });
    }

    public void stop() {
        LOG.info("Received request to stop HdfsSystemConsumer.");
        this.isShutdown = true;
        this.executorService.shutdown();
        LOG.info("HdfsSystemConsumer stopped.");
    }

    private List<String> getPartitionDescriptor(SystemStreamPartition systemStreamPartition) {
        try {
            return (List) ((Map) this.cachedPartitionDescriptorMap.get(systemStreamPartition.getStream())).get(systemStreamPartition.getPartition());
        } catch (ExecutionException e) {
            throw new SamzaException("Failed to obtain descriptor for " + systemStreamPartition, e);
        }
    }

    protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
        return new LinkedBlockingQueue(this.bufferCapacity);
    }

    public void register(SystemStreamPartition systemStreamPartition, String str) {
        LOG.info("HdfsSystemConsumer register with partition: " + systemStreamPartition + " and offset " + str);
        super.register(systemStreamPartition, str);
        this.readers.put(systemStreamPartition, new MultiFileHdfsReader(this.readerType, systemStreamPartition, getPartitionDescriptor(systemStreamPartition), str, this.numMaxRetires));
        this.consumerMetrics.registerSystemStreamPartition(systemStreamPartition);
    }

    public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> set, long j) throws InterruptedException {
        set.forEach(systemStreamPartition -> {
            Future future = this.readerRunnableStatus.get(systemStreamPartition);
            if (future.isDone()) {
                try {
                    future.get();
                } catch (InterruptedException | ExecutionException e) {
                    MultiFileHdfsReader multiFileHdfsReader = this.readers.get(systemStreamPartition);
                    LOG.warn(String.format("Detect failure in ReaderRunnable for ssp: %s. Try to reconnect now.", systemStreamPartition), e);
                    multiFileHdfsReader.reconnect();
                    this.readerRunnableStatus.put(systemStreamPartition, this.executorService.submit(new ReaderRunnable(multiFileHdfsReader)));
                }
            }
        });
        return super.poll(set, j);
    }

    private void offerMessage(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope incomingMessageEnvelope) {
        try {
            super.put(systemStreamPartition, incomingMessageEnvelope);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new SamzaException("ReaderRunnable interrupted for ssp: " + systemStreamPartition);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doPoll(MultiFileHdfsReader multiFileHdfsReader) {
        SystemStreamPartition systemStreamPartition = multiFileHdfsReader.getSystemStreamPartition();
        while (multiFileHdfsReader.hasNext() && !this.isShutdown) {
            offerMessage(systemStreamPartition, multiFileHdfsReader.readNext());
            this.consumerMetrics.incNumEvents(systemStreamPartition);
            this.consumerMetrics.incTotalNumEvents();
        }
        offerMessage(systemStreamPartition, IncomingMessageEnvelope.buildEndOfStreamEnvelope(systemStreamPartition));
        multiFileHdfsReader.close();
    }
}
