package com.microsoft.azure.eventhubs;

import com.microsoft.azure.eventhubs.ReceivePump;
import com.microsoft.azure.servicebus.ClientConstants;
import com.microsoft.azure.servicebus.ClientEntity;
import com.microsoft.azure.servicebus.IReceiverSettingsProvider;
import com.microsoft.azure.servicebus.MessageReceiver;
import com.microsoft.azure.servicebus.MessagingFactory;
import com.microsoft.azure.servicebus.PassByRef;
import com.microsoft.azure.servicebus.ServiceBusException;
import com.microsoft.azure.servicebus.StringUtil;
import com.microsoft.azure.servicebus.amqp.AmqpConstants;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnknownDescribedType;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.message.Message;

/* loaded from: input_file:com/microsoft/azure/eventhubs/PartitionReceiver.class */
public final class PartitionReceiver extends ClientEntity implements IReceiverSettingsProvider {
    private static final Logger TRACE_LOGGER = Logger.getLogger(ClientConstants.SERVICEBUS_CLIENT_TRACE);
    private static final int MINIMUM_PREFETCH_COUNT = 10;
    private static final int MAXIMUM_PREFETCH_COUNT = 999;
    static final int DEFAULT_PREFETCH_COUNT = 999;
    static final long NULL_EPOCH = 0;
    public static final String START_OF_STREAM = "-1";
    public static final String END_OF_STREAM = "@latest";
    private final String partitionId;
    private final MessagingFactory underlyingFactory;
    private final String eventHubName;
    private final String consumerGroupName;
    private final Object receiveHandlerLock;
    private String startingOffset;
    private boolean offsetInclusive;
    private Instant startingDateTime;
    private MessageReceiver internalReceiver;
    private Long epoch;
    private boolean isEpochReceiver;
    private ReceivePump receivePump;
    private ReceiverOptions receiverOptions;
    private ReceiverRuntimeInformation runtimeInformation;

    private PartitionReceiver(MessagingFactory messagingFactory, String str, String str2, String str3, String str4, boolean z, Instant instant, Long l, boolean z2, ReceiverOptions receiverOptions) throws ServiceBusException {
        super(null, null);
        this.underlyingFactory = messagingFactory;
        this.eventHubName = str;
        this.consumerGroupName = str2;
        this.partitionId = str3;
        this.startingOffset = str4;
        this.offsetInclusive = z;
        this.startingDateTime = instant;
        this.epoch = l;
        this.isEpochReceiver = z2;
        this.receiveHandlerLock = new Object();
        this.receiverOptions = receiverOptions;
        if (this.receiverOptions == null || !this.receiverOptions.getReceiverRuntimeMetricEnabled()) {
            return;
        }
        this.runtimeInformation = new ReceiverRuntimeInformation(str3);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static CompletableFuture<PartitionReceiver> create(MessagingFactory messagingFactory, String str, String str2, String str3, String str4, boolean z, Instant instant, long j, boolean z2, ReceiverOptions receiverOptions) throws ServiceBusException {
        if (j < NULL_EPOCH) {
            throw new IllegalArgumentException("epoch cannot be a negative value. Please specify a zero or positive long value.");
        }
        if (StringUtil.isNullOrWhiteSpace(str2)) {
            throw new IllegalArgumentException("specify valid string for argument - 'consumerGroupName'");
        }
        PartitionReceiver partitionReceiver = new PartitionReceiver(messagingFactory, str, str2, str3, str4, z, instant, Long.valueOf(j), z2, receiverOptions);
        return partitionReceiver.createInternalReceiver().thenApply((Function<? super Void, ? extends U>) new Function<Void, PartitionReceiver>() { // from class: com.microsoft.azure.eventhubs.PartitionReceiver.1
            @Override // java.util.function.Function
            public PartitionReceiver apply(Void r3) {
                return PartitionReceiver.this;
            }
        });
    }

    private CompletableFuture<Void> createInternalReceiver() throws ServiceBusException {
        return MessageReceiver.create(this.underlyingFactory, StringUtil.getRandomString(), String.format("%s/ConsumerGroups/%s/Partitions/%s", this.eventHubName, this.consumerGroupName, this.partitionId), 999, this).thenAccept((Consumer<? super MessageReceiver>) new Consumer<MessageReceiver>() { // from class: com.microsoft.azure.eventhubs.PartitionReceiver.2
            @Override // java.util.function.Consumer
            public void accept(MessageReceiver messageReceiver) {
                PartitionReceiver.this.internalReceiver = messageReceiver;
            }
        });
    }

    final String getStartingOffset() {
        return this.startingOffset;
    }

    final boolean getOffsetInclusive() {
        return this.offsetInclusive;
    }

    public final String getPartitionId() {
        return this.partitionId;
    }

    public final int getPrefetchCount() {
        return this.internalReceiver.getPrefetchCount();
    }

    public final Duration getReceiveTimeout() {
        return this.internalReceiver.getReceiveTimeout();
    }

    public void setReceiveTimeout(Duration duration) {
        this.internalReceiver.setReceiveTimeout(duration);
    }

    public final void setPrefetchCount(int i) throws ServiceBusException {
        if (i < 10 || i > 999) {
            throw new IllegalArgumentException(String.format(Locale.US, "PrefetchCount has to be between %s and %s", 10, 999));
        }
        this.internalReceiver.setPrefetchCount(i);
    }

    public final long getEpoch() {
        return this.epoch.longValue();
    }

    public final ReceiverRuntimeInformation getRuntimeInformation() {
        return this.runtimeInformation;
    }

    public final Iterable<EventData> receiveSync(int i) throws ServiceBusException {
        try {
            return receive(i).get();
        } catch (InterruptedException | ExecutionException e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            Throwable cause = e.getCause();
            if (cause == null) {
                return null;
            }
            if (cause instanceof RuntimeException) {
                throw ((RuntimeException) cause);
            }
            if (cause instanceof ServiceBusException) {
                throw ((ServiceBusException) cause);
            }
            throw new ServiceBusException(true, cause);
        }
    }

    public CompletableFuture<Iterable<EventData>> receive(int i) {
        return this.internalReceiver.receive(i).thenApply((Function<? super Collection<Message>, ? extends U>) new Function<Collection<Message>, Iterable<EventData>>() { // from class: com.microsoft.azure.eventhubs.PartitionReceiver.3
            @Override // java.util.function.Function
            public Iterable<EventData> apply(Collection<Message> collection) {
                DeliveryAnnotations deliveryAnnotations;
                PassByRef passByRef = null;
                if (PartitionReceiver.this.receiverOptions != null && PartitionReceiver.this.receiverOptions.getReceiverRuntimeMetricEnabled()) {
                    passByRef = new PassByRef();
                }
                LinkedList<EventData> eventDataCollection = EventDataUtil.toEventDataCollection(collection, passByRef);
                if (passByRef != null && passByRef.get() != null && (deliveryAnnotations = ((Message) passByRef.get()).getDeliveryAnnotations()) != null && deliveryAnnotations.getValue() != null) {
                    Map value = deliveryAnnotations.getValue();
                    PartitionReceiver.this.runtimeInformation.setRuntimeInformation(((Long) value.get(ClientConstants.LAST_ENQUEUED_SEQUENCE_NUMBER)).longValue(), ((Date) value.get(ClientConstants.LAST_ENQUEUED_TIME_UTC)).toInstant(), (String) value.get(ClientConstants.LAST_ENQUEUED_OFFSET));
                }
                return eventDataCollection;
            }
        });
    }

    public CompletableFuture<Void> setReceiveHandler(PartitionReceiveHandler partitionReceiveHandler) {
        return setReceiveHandler(partitionReceiveHandler, false);
    }

    public CompletableFuture<Void> setReceiveHandler(PartitionReceiveHandler partitionReceiveHandler, boolean z) {
        synchronized (this.receiveHandlerLock) {
            if (partitionReceiveHandler == null) {
                if (this.receivePump != null && this.receivePump.isRunning()) {
                    return this.receivePump.stop();
                }
            } else {
                if (this.receivePump != null && this.receivePump.isRunning()) {
                    throw new IllegalArgumentException("Unexpected value for parameter 'receiveHandler'. PartitionReceiver was already registered with a PartitionReceiveHandler instance. Only 1 instance can be registered.");
                }
                this.receivePump = new ReceivePump(new ReceivePump.IPartitionReceiver() { // from class: com.microsoft.azure.eventhubs.PartitionReceiver.4
                    @Override // com.microsoft.azure.eventhubs.ReceivePump.IPartitionReceiver
                    public Iterable<EventData> receive(int i) throws ServiceBusException {
                        return PartitionReceiver.this.receiveSync(i);
                    }

                    @Override // com.microsoft.azure.eventhubs.ReceivePump.IPartitionReceiver
                    public String getPartitionId() {
                        return PartitionReceiver.this.getPartitionId();
                    }
                }, partitionReceiveHandler, z);
                new Thread(new Runnable() { // from class: com.microsoft.azure.eventhubs.PartitionReceiver.5
                    @Override // java.lang.Runnable
                    public void run() {
                        PartitionReceiver.this.receivePump.run();
                    }
                }).start();
            }
            return CompletableFuture.completedFuture(null);
        }
    }

    @Override // com.microsoft.azure.servicebus.ClientEntity
    public CompletableFuture<Void> onClose() {
        if (this.receivePump != null && this.receivePump.isRunning()) {
            this.receivePump.stop();
        }
        return this.internalReceiver != null ? this.internalReceiver.close() : CompletableFuture.completedFuture(null);
    }

    @Override // com.microsoft.azure.servicebus.IReceiverSettingsProvider
    public Map<Symbol, UnknownDescribedType> getFilter(Message message) {
        boolean z;
        String str;
        UnknownDescribedType unknownDescribedType;
        long j;
        if (message == null && this.startingOffset == null) {
            try {
                j = this.startingDateTime.toEpochMilli();
            } catch (ArithmeticException e) {
                j = Long.MAX_VALUE;
                if (TRACE_LOGGER.isLoggable(Level.WARNING)) {
                    TRACE_LOGGER.log(Level.WARNING, String.format("receiverPath[%s], action[createReceiveLink], warning[starting receiver from epoch+Long.Max]", this.internalReceiver.getReceivePath()));
                }
            }
            unknownDescribedType = new UnknownDescribedType(AmqpConstants.STRING_FILTER, String.format(AmqpConstants.AMQP_ANNOTATION_FORMAT, AmqpConstants.ENQUEUED_TIME_UTC_ANNOTATION_NAME, StringUtil.EMPTY, Long.valueOf(j)));
        } else {
            if (message != null) {
                z = false;
                str = message.getMessageAnnotations().getValue().get(AmqpConstants.OFFSET).toString();
            } else {
                z = this.offsetInclusive;
                str = this.startingOffset;
            }
            if (TRACE_LOGGER.isLoggable(Level.FINE)) {
                TRACE_LOGGER.log(Level.FINE, String.format("%s, action[createReceiveLink], offset[%s], offsetInclusive[%s]", this.internalReceiver == null ? "receiverPath[RECEIVER IS NULL]" : "receiverPath[" + this.internalReceiver.getReceivePath() + "]", str, Boolean.valueOf(z)));
            }
            Symbol symbol = AmqpConstants.STRING_FILTER;
            Object[] objArr = new Object[3];
            objArr[0] = AmqpConstants.OFFSET_ANNOTATION_NAME;
            objArr[1] = z ? "=" : StringUtil.EMPTY;
            objArr[2] = str;
            unknownDescribedType = new UnknownDescribedType(symbol, String.format(AmqpConstants.AMQP_ANNOTATION_FORMAT, objArr));
        }
        return Collections.singletonMap(AmqpConstants.STRING_FILTER, unknownDescribedType);
    }

    @Override // com.microsoft.azure.servicebus.IReceiverSettingsProvider
    public Map<Symbol, Object> getProperties() {
        if (this.isEpochReceiver) {
            return Collections.singletonMap(AmqpConstants.EPOCH, this.epoch);
        }
        return null;
    }

    @Override // com.microsoft.azure.servicebus.IReceiverSettingsProvider
    public Symbol[] getDesiredCapabilities() {
        if (this.receiverOptions == null || !this.receiverOptions.getReceiverRuntimeMetricEnabled()) {
            return null;
        }
        return new Symbol[]{AmqpConstants.ENABLE_RECEIVER_RUNTIME_METRIC_NAME};
    }
}
