package org.apache.samza.system.eventhub.consumer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.EventPosition;
import com.microsoft.azure.eventhubs.PartitionReceiveHandler;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.SamzaHistogram;
import org.apache.samza.metrics.SlidingTimeWindowReservoir;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.eventhub.EventHubClientManager;
import org.apache.samza.system.eventhub.EventHubClientManagerFactory;
import org.apache.samza.system.eventhub.EventHubConfig;
import org.apache.samza.system.eventhub.Interceptor;
import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin;
import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
import org.apache.samza.util.BlockingEnvelopeMap;
import org.apache.samza.util.Clock;
import org.apache.samza.util.ShutdownUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.class */
public class EventHubSystemConsumer extends BlockingEnvelopeMap {
    public static final String START_OF_STREAM = "-1";
    public static final String END_OF_STREAM = "-2";
    public static final String EVENT_READ_RATE = "eventReadRate";
    public static final String EVENT_BYTE_READ_RATE = "eventByteReadRate";
    public static final String CONSUMPTION_LAG_MS = "consumptionLagMs";
    public static final String READ_ERRORS = "readErrors";
    public static final String AGGREGATE = "aggregate";
    private final Map<String, Counter> eventReadRates;
    private final Map<String, Counter> eventByteReadRates;
    private final Map<String, SamzaHistogram> consumptionLagMs;
    private final Map<String, Counter> readErrors;

    @VisibleForTesting
    final Map<SystemStreamPartition, PartitionReceiveHandler> streamPartitionHandlers;

    @VisibleForTesting
    final Map<SystemStreamPartition, EventHubClientManager> perPartitionEventHubManagers;
    private final Map<SystemStreamPartition, PartitionReceiver> streamPartitionReceivers;
    private final Map<String, EventHubClientManager> perStreamEventHubManagers;
    private final Map<SystemStreamPartition, String> streamPartitionOffsets;
    private final Map<String, Interceptor> interceptors;
    private final Integer prefetchCount;
    private volatile boolean isStarted;
    private final EventHubConfig config;
    private final String systemName;
    private final EventHubClientManagerFactory eventHubClientManagerFactory;
    private final AtomicReference<Throwable> eventHubNonTransientError;
    private final ExecutorService reconnectTaskRunner;
    private long lastRetryTs;
    private final Clock clock;

    @VisibleForTesting
    final SlidingTimeWindowReservoir recentRetryAttempts;

    @VisibleForTesting
    volatile Future reconnectTaskStatus;
    private static final Logger LOG = LoggerFactory.getLogger(EventHubSystemConsumer.class);
    private static final Duration DEFAULT_EVENTHUB_RECEIVER_TIMEOUT = Duration.ofMinutes(10);
    private static final Duration DEFAULT_EVENTHUB_CREATE_RECEIVER_TIMEOUT = Duration.ofMinutes(1);
    private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofSeconds(15).toMillis();
    private static final Object AGGREGATE_METRICS_LOCK = new Object();
    private static Counter aggEventReadRate = null;
    private static Counter aggEventByteReadRate = null;
    private static SamzaHistogram aggConsumptionLagMs = null;
    private static Counter aggReadErrors = null;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer$PartitionReceiverHandlerImpl.class */
    public class PartitionReceiverHandlerImpl implements PartitionReceiveHandler {
        private final Counter eventReadRate;
        private final Counter eventByteReadRate;
        private final SamzaHistogram readLatency;
        private final Counter errorRate;
        private final Interceptor interceptor;
        private final Integer maxEventCount;
        private final SystemStreamPartition ssp;

        PartitionReceiverHandlerImpl(SystemStreamPartition systemStreamPartition, Counter counter, Counter counter2, SamzaHistogram samzaHistogram, Counter counter3, Interceptor interceptor, int i) {
            this.ssp = systemStreamPartition;
            this.eventReadRate = counter;
            this.eventByteReadRate = counter2;
            this.readLatency = samzaHistogram;
            this.errorRate = counter3;
            this.interceptor = interceptor;
            this.maxEventCount = Integer.valueOf(i);
        }

        public int getMaxEventCount() {
            return this.maxEventCount.intValue();
        }

        public void onReceive(Iterable<EventData> iterable) {
            if (iterable != null) {
                iterable.forEach(eventData -> {
                    byte[] bytes = eventData.getBytes();
                    if (this.interceptor != null) {
                        bytes = this.interceptor.intercept(bytes);
                    }
                    String offset = eventData.getSystemProperties().getOffset();
                    Object partitionKey = eventData.getSystemProperties().getPartitionKey();
                    if (partitionKey == null) {
                        partitionKey = eventData.getProperties().get(EventHubSystemProducer.KEY);
                    }
                    try {
                        updateMetrics(eventData);
                        EventHubSystemConsumer.this.put(this.ssp, new EventHubIncomingMessageEnvelope(this.ssp, offset, partitionKey, bytes, eventData));
                        EventHubSystemConsumer.this.streamPartitionOffsets.put(this.ssp, offset);
                    } catch (InterruptedException e) {
                        String format = String.format("Interrupted while adding the event from ssp %s to dispatch queue.", this.ssp);
                        EventHubSystemConsumer.LOG.error(format, e);
                        throw new SamzaException(format, e);
                    }
                });
            }
        }

        private void updateMetrics(EventData eventData) {
            int length = eventData.getBytes() == null ? 0 : eventData.getBytes().length;
            this.eventReadRate.inc();
            EventHubSystemConsumer.aggEventReadRate.inc();
            this.eventByteReadRate.inc(length);
            EventHubSystemConsumer.aggEventByteReadRate.inc(length);
            long millis = Duration.between(eventData.getSystemProperties().getEnqueuedTime(), Instant.now()).toMillis();
            this.readLatency.update(millis);
            EventHubSystemConsumer.aggConsumptionLagMs.update(millis);
        }

        public void onError(Throwable th) {
            this.errorRate.inc();
            EventHubSystemConsumer.aggReadErrors.inc();
            if (!(th instanceof EventHubException) || !((EventHubException) th).getIsTransient()) {
                EventHubSystemConsumer.LOG.error(String.format("Received non transient exception from EH client for ssp: %s", this.ssp), th);
                EventHubSystemConsumer.this.eventHubNonTransientError.set(th);
                return;
            }
            EventHubSystemConsumer.LOG.warn(String.format("Received transient exception from EH client. Renew partition receiver for ssp: %s", this.ssp), th);
            try {
                TimeUnit.SECONDS.sleep(2L);
            } catch (InterruptedException e) {
                EventHubSystemConsumer.LOG.warn("Interrupted during sleep before renew", e);
            }
            EventHubSystemConsumer.this.renewPartitionReceiver(this.ssp);
        }
    }

    public EventHubSystemConsumer(EventHubConfig eventHubConfig, String str, EventHubClientManagerFactory eventHubClientManagerFactory, Map<String, Interceptor> map, MetricsRegistry metricsRegistry) {
        this(eventHubConfig, str, eventHubClientManagerFactory, map, metricsRegistry, System::currentTimeMillis);
    }

    EventHubSystemConsumer(EventHubConfig eventHubConfig, String str, EventHubClientManagerFactory eventHubClientManagerFactory, Map<String, Interceptor> map, MetricsRegistry metricsRegistry, Clock clock) {
        super(metricsRegistry, clock);
        this.streamPartitionHandlers = new ConcurrentHashMap();
        this.perPartitionEventHubManagers = new ConcurrentHashMap();
        this.streamPartitionReceivers = new ConcurrentHashMap();
        this.perStreamEventHubManagers = new ConcurrentHashMap();
        this.streamPartitionOffsets = new ConcurrentHashMap();
        this.isStarted = false;
        this.eventHubNonTransientError = new AtomicReference<>(null);
        this.reconnectTaskRunner = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("EventHubs-Reconnect-Task").setDaemon(true).build());
        this.lastRetryTs = 0L;
        this.reconnectTaskStatus = null;
        this.config = eventHubConfig;
        this.clock = clock;
        this.systemName = str;
        this.interceptors = map;
        this.eventHubClientManagerFactory = eventHubClientManagerFactory;
        List<String> streams = eventHubConfig.getStreams(str);
        this.prefetchCount = eventHubConfig.getPrefetchCount(str);
        this.recentRetryAttempts = new SlidingTimeWindowReservoir(eventHubConfig.getRetryWindowMs(str), clock);
        this.eventReadRates = (Map) streams.stream().collect(Collectors.toMap(Function.identity(), str2 -> {
            return metricsRegistry.newCounter(str2, EVENT_READ_RATE);
        }));
        this.eventByteReadRates = (Map) streams.stream().collect(Collectors.toMap(Function.identity(), str3 -> {
            return metricsRegistry.newCounter(str3, EVENT_BYTE_READ_RATE);
        }));
        this.consumptionLagMs = (Map) streams.stream().collect(Collectors.toMap(Function.identity(), str4 -> {
            return new SamzaHistogram(metricsRegistry, str4, CONSUMPTION_LAG_MS);
        }));
        this.readErrors = (Map) streams.stream().collect(Collectors.toMap(Function.identity(), str5 -> {
            return metricsRegistry.newCounter(str5, READ_ERRORS);
        }));
        synchronized (AGGREGATE_METRICS_LOCK) {
            if (aggEventReadRate == null) {
                aggEventReadRate = metricsRegistry.newCounter(AGGREGATE, EVENT_READ_RATE);
                aggEventByteReadRate = metricsRegistry.newCounter(AGGREGATE, EVENT_BYTE_READ_RATE);
                aggConsumptionLagMs = new SamzaHistogram(metricsRegistry, AGGREGATE, CONSUMPTION_LAG_MS);
                aggReadErrors = metricsRegistry.newCounter(AGGREGATE, READ_ERRORS);
            }
        }
    }

    public void register(SystemStreamPartition systemStreamPartition, String str) {
        super.register(systemStreamPartition, str);
        LOG.info(String.format("Eventhub consumer trying to register ssp %s, offset %s", systemStreamPartition, str));
        if (this.isStarted) {
            throw new SamzaException("Trying to add partition when the connection has already started.");
        }
        if (this.streamPartitionOffsets.containsKey(systemStreamPartition)) {
            if (END_OF_STREAM.equals(str)) {
                return;
            }
            String str2 = this.streamPartitionOffsets.get(systemStreamPartition);
            if (!END_OF_STREAM.equals(str2) && EventHubSystemAdmin.compareOffsets(str, str2).intValue() > -1) {
                return;
            }
        }
        this.streamPartitionOffsets.put(systemStreamPartition, str);
    }

    private EventHubClientManager createOrGetEventHubClientManagerForSSP(String str, SystemStreamPartition systemStreamPartition) {
        EventHubClientManager eventHubClientManager;
        if (!this.config.getPerPartitionConnection(this.systemName).booleanValue()) {
            if (!this.perStreamEventHubManagers.containsKey(str)) {
                LOG.info("Creating EventHub client manager for stream: " + str);
                EventHubClientManager eventHubClientManager2 = this.eventHubClientManagerFactory.getEventHubClientManager(this.systemName, str, this.config);
                eventHubClientManager2.init();
                this.perStreamEventHubManagers.put(str, eventHubClientManager2);
            }
            eventHubClientManager = this.perStreamEventHubManagers.get(str);
            this.perPartitionEventHubManagers.put(systemStreamPartition, eventHubClientManager);
        } else if (this.perPartitionEventHubManagers.containsKey(systemStreamPartition)) {
            LOG.warn(String.format("Trying to create new EventHubClientManager for ssp=%s. But one already exists", systemStreamPartition));
            eventHubClientManager = this.perPartitionEventHubManagers.get(systemStreamPartition);
        } else {
            LOG.info("Creating EventHub client manager for SSP: " + systemStreamPartition);
            eventHubClientManager = this.eventHubClientManagerFactory.getEventHubClientManager(this.systemName, str, this.config);
            eventHubClientManager.init();
            this.perPartitionEventHubManagers.put(systemStreamPartition, eventHubClientManager);
        }
        LOG.info("EventHub client created for ssp: " + systemStreamPartition);
        Validate.notNull(eventHubClientManager, String.format("Fail to create or get EventHubClientManager for ssp=%s", systemStreamPartition), new Object[0]);
        return eventHubClientManager;
    }

    private synchronized void initializeEventHubsManagers() {
        LOG.info("Starting EventHubSystemConsumer. Count of SSPs registered: " + this.streamPartitionOffsets.entrySet().size());
        this.eventHubNonTransientError.set(null);
        for (Map.Entry<SystemStreamPartition, String> entry : this.streamPartitionOffsets.entrySet()) {
            SystemStreamPartition key = entry.getKey();
            String streamId = this.config.getStreamId(key.getStream());
            Integer valueOf = Integer.valueOf(key.getPartition().getPartitionId());
            String value = entry.getValue();
            String streamConsumerGroup = this.config.getStreamConsumerGroup(this.systemName, streamId);
            String streamNamespace = this.config.getStreamNamespace(this.systemName, streamId);
            String streamEntityPath = this.config.getStreamEntityPath(this.systemName, streamId);
            EventHubClientManager createOrGetEventHubClientManagerForSSP = createOrGetEventHubClientManagerForSSP(streamId, key);
            try {
                PartitionReceiver partitionReceiver = END_OF_STREAM.equals(value) ? (PartitionReceiver) createOrGetEventHubClientManagerForSSP.getEventHubClient().createReceiver(streamConsumerGroup, valueOf.toString(), EventPosition.fromEnqueuedTime(Instant.now())).get(DEFAULT_EVENTHUB_CREATE_RECEIVER_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS) : (PartitionReceiver) createOrGetEventHubClientManagerForSSP.getEventHubClient().createReceiver(streamConsumerGroup, valueOf.toString(), EventPosition.fromOffset(value, false)).get(DEFAULT_EVENTHUB_CREATE_RECEIVER_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
                partitionReceiver.setPrefetchCount(this.prefetchCount.intValue());
                PartitionReceiverHandlerImpl partitionReceiverHandlerImpl = new PartitionReceiverHandlerImpl(key, this.eventReadRates.get(streamId), this.eventByteReadRates.get(streamId), this.consumptionLagMs.get(streamId), this.readErrors.get(streamId), this.interceptors.getOrDefault(streamId, null), this.config.getMaxEventCountPerPoll(this.systemName).intValue());
                partitionReceiver.setReceiveTimeout(DEFAULT_EVENTHUB_RECEIVER_TIMEOUT);
                partitionReceiver.setReceiveHandler(partitionReceiverHandlerImpl);
                this.streamPartitionHandlers.put(key, partitionReceiverHandlerImpl);
                this.streamPartitionReceivers.put(key, partitionReceiver);
                LOG.info(String.format("Connection successfully started for namespace=%s, entity=%s ", streamNamespace, streamEntityPath));
            } catch (Exception e) {
                throw new SamzaException(String.format("Failed to create receiver for EventHubs: namespace=%s, entity=%s, partitionId=%d", streamNamespace, streamEntityPath, valueOf), e);
            }
        }
    }

    public void start() {
        if (this.isStarted) {
            LOG.warn("Trying to start EventHubSystemConsumer while it's already started. Ignore the request.");
            return;
        }
        this.isStarted = true;
        initializeEventHubsManagers();
        LOG.info("EventHubSystemConsumer started");
    }

    public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> set, long j) throws InterruptedException {
        Throwable th = this.eventHubNonTransientError.get();
        if (th != null && this.clock.currentTimeMillis() - this.lastRetryTs > this.config.getMinRetryIntervalMs(this.systemName)) {
            int size = this.recentRetryAttempts.size();
            long maxRetryCount = this.config.getMaxRetryCount(this.systemName);
            if (size >= maxRetryCount) {
                LOG.error("Retries exhausted. Reached max allowed retries: ({}) within window {} ms", Integer.valueOf(size), Long.valueOf(this.config.getRetryWindowMs(this.systemName)));
                throw new SamzaException("Received a non transient error from event hub partition receiver", th);
            }
            LOG.warn("Received non transient error. Will retry.", th);
            LOG.info("Current retry count within window: {}. max retry count allowed: {}. window size: {} ms", new Object[]{Integer.valueOf(size), Long.valueOf(maxRetryCount), Long.valueOf(this.config.getRetryWindowMs(this.systemName))});
            long currentTimeMillis = this.clock.currentTimeMillis();
            this.recentRetryAttempts.update(currentTimeMillis);
            this.lastRetryTs = currentTimeMillis;
            this.reconnectTaskStatus = this.reconnectTaskRunner.submit(this::renewEventHubsClient);
        }
        return super.poll(set, j);
    }

    private synchronized void renewEventHubsClient() {
        try {
            LOG.info("Start to renew eventhubs client");
            shutdownEventHubsManagers();
            initializeEventHubsManagers();
        } catch (Exception e) {
            LOG.error("Failed to renew eventhubs client", e);
            this.eventHubNonTransientError.set(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void renewPartitionReceiver(SystemStreamPartition systemStreamPartition) {
        String streamId = this.config.getStreamId(systemStreamPartition.getStream());
        EventHubClientManager eventHubClientManager = this.perPartitionEventHubManagers.get(systemStreamPartition);
        String str = this.streamPartitionOffsets.get(systemStreamPartition);
        Integer valueOf = Integer.valueOf(systemStreamPartition.getPartition().getPartitionId());
        String streamConsumerGroup = this.config.getStreamConsumerGroup(systemStreamPartition.getSystem(), streamId);
        try {
            this.streamPartitionReceivers.get(systemStreamPartition).close().get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
            PartitionReceiver createReceiverSync = eventHubClientManager.getEventHubClient().createReceiverSync(streamConsumerGroup, valueOf.toString(), EventPosition.fromOffset(str, !str.equals(START_OF_STREAM)));
            createReceiverSync.setPrefetchCount(this.prefetchCount.intValue());
            createReceiverSync.setReceiveTimeout(DEFAULT_EVENTHUB_RECEIVER_TIMEOUT);
            createReceiverSync.setReceiveHandler(this.streamPartitionHandlers.get(systemStreamPartition));
            this.streamPartitionReceivers.put(systemStreamPartition, createReceiverSync);
        } catch (Exception e) {
            this.eventHubNonTransientError.set(new SamzaException(String.format("Failed to recreate receiver for EventHubs after ReceiverHandlerError (ssp=%s)", systemStreamPartition), e));
        }
    }

    private synchronized void shutdownEventHubsManagers() {
        LOG.info("Start shutting down eventhubs receivers");
        ShutdownUtil.boundedShutdown((List) this.streamPartitionReceivers.values().stream().map(partitionReceiver -> {
            return () -> {
                try {
                    partitionReceiver.close().get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
                } catch (Exception e) {
                    LOG.error("Failed to shutdown receiver.", e);
                }
            };
        }).collect(Collectors.toList()), "EventHubSystemConsumer.Receiver#close", DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
        LOG.info("Start shutting down eventhubs managers");
        ShutdownUtil.boundedShutdown((List) this.perPartitionEventHubManagers.values().stream().map(eventHubClientManager -> {
            return () -> {
                try {
                    eventHubClientManager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
                } catch (Exception e) {
                    LOG.error("Failed to shutdown eventhubs manager.", e);
                }
            };
        }).collect(Collectors.toList()), "EventHubSystemConsumer.ClientManager#close", DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
        this.perPartitionEventHubManagers.clear();
        this.perStreamEventHubManagers.clear();
    }

    public void stop() {
        LOG.info("Stopping event hub system consumer...");
        try {
            this.reconnectTaskRunner.shutdown();
            shutdownEventHubsManagers();
            this.isStarted = false;
        } catch (Exception e) {
            LOG.warn("Exception during stop.", e);
        }
        LOG.info("Event hub system consumer stopped.");
    }

    protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
        return new LinkedBlockingQueue(this.config.getConsumerBufferCapacity(this.systemName));
    }
}
