package org.apache.druid.indexing.rabbitstream;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Queues;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.ConsumerBuilder;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.EnvironmentBuilder;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.impl.Client;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.indexing.rabbitstream.supervisor.RabbitStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.DynamicConfigProvider;

/* loaded from: input_file:org/apache/druid/indexing/rabbitstream/RabbitStreamRecordSupplier.class */
public class RabbitStreamRecordSupplier implements RecordSupplier<String, Long, ByteEntity>, MessageHandler {
    private static final EmittingLogger log = new EmittingLogger(RabbitStreamRecordSupplier.class);
    private boolean closed;
    private BlockingQueue<OrderedPartitionableRecord<String, Long, ByteEntity>> queue;
    private String superStream;
    private String uri;
    private ObjectMapper mapper;
    private final int recordBufferOfferTimeout;
    private final int maxRecordsPerPoll;
    private final int recordBufferSize;
    private String password;
    private String username;
    private final Map<String, OffsetSpecification> offsetMap = new ConcurrentHashMap();
    private Map<String, ConsumerBuilder> streamBuilders = new ConcurrentHashMap();
    private Semaphore stateSemaphore = new Semaphore(1, true);
    private boolean isRunning = false;
    private List<Consumer> consumers = new ArrayList();
    private Environment env = null;

    public RabbitStreamRecordSupplier(Map<String, Object> map, ObjectMapper objectMapper, String str, int i, int i2, int i3) {
        Object obj;
        this.uri = str;
        this.mapper = objectMapper;
        this.recordBufferSize = i;
        this.maxRecordsPerPoll = i3;
        this.recordBufferOfferTimeout = i2;
        this.queue = new LinkedBlockingQueue(i);
        this.password = null;
        this.username = null;
        if (map == null || (obj = map.get(RabbitStreamSupervisorIOConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY)) == null) {
            return;
        }
        for (Map.Entry entry : ((DynamicConfigProvider) this.mapper.convertValue(obj, DynamicConfigProvider.class)).getConfig().entrySet()) {
            if (((String) entry.getKey()).equals(RabbitStreamSupervisorIOConfig.PASSWORD_KEY)) {
                this.password = (String) entry.getValue();
            }
            if (((String) entry.getKey()).equals(RabbitStreamSupervisorIOConfig.USERNAME_KEY)) {
                this.username = (String) entry.getValue();
            }
        }
    }

    private void startBackgroundFetch() {
        try {
            this.stateSemaphore.acquireUninterruptibly();
            if (this.isRunning) {
                return;
            }
            for (Map.Entry<String, ConsumerBuilder> entry : this.streamBuilders.entrySet()) {
                this.consumers.add(entry.getValue().offset(this.offsetMap.get(entry.getKey())).build());
            }
            this.isRunning = true;
        } finally {
            this.stateSemaphore.release();
        }
    }

    @VisibleForTesting
    public int bufferSize() {
        return this.queue.size();
    }

    @VisibleForTesting
    public boolean isRunning() {
        return this.isRunning;
    }

    @VisibleForTesting
    public OffsetSpecification getOffset(StreamPartition<String> streamPartition) {
        return this.offsetMap.get(streamPartition.getPartitionId());
    }

    public void stopBackgroundFetch() {
        try {
            this.stateSemaphore.acquire();
            try {
                if (!this.isRunning) {
                    return;
                }
                Iterator<Consumer> it = this.consumers.iterator();
                while (it.hasNext()) {
                    it.next().close();
                }
                this.consumers.clear();
                this.isRunning = false;
                this.stateSemaphore.release();
            } finally {
                this.stateSemaphore.release();
            }
        } catch (InterruptedException e) {
        }
    }

    public EnvironmentBuilder getEnvBuilder() {
        return Environment.builder();
    }

    public Environment getRabbitEnvironment() {
        if (this.env != null) {
            return this.env;
        }
        EnvironmentBuilder uri = getEnvBuilder().uri(this.uri);
        if (this.password != null) {
            uri = uri.password(this.password);
        }
        if (this.username != null) {
            uri = uri.username(this.username);
        }
        this.env = uri.build();
        return this.env;
    }

    public static String getStreamFromSubstream(String str) {
        String[] split = str.split("-");
        return String.join("-", (String[]) Arrays.copyOf(split, split.length - 1));
    }

    private void removeOldAssignments(Set<StreamPartition<String>> set) {
        Iterator<Map.Entry<String, ConsumerBuilder>> it = this.streamBuilders.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, ConsumerBuilder> next = it.next();
            if (!set.contains(new StreamPartition(getStreamFromSubstream(next.getKey()), next.getKey()))) {
                it.remove();
            }
        }
        Iterator<Map.Entry<String, OffsetSpecification>> it2 = this.offsetMap.entrySet().iterator();
        while (it2.hasNext()) {
            Map.Entry<String, OffsetSpecification> next2 = it2.next();
            if (!set.contains(new StreamPartition(getStreamFromSubstream(next2.getKey()), next2.getKey()))) {
                it2.remove();
            }
        }
    }

    public void assign(Set<StreamPartition<String>> set) {
        stopBackgroundFetch();
        for (StreamPartition<String> streamPartition : set) {
            this.streamBuilders.put((String) streamPartition.getPartitionId(), getRabbitEnvironment().consumerBuilder().noTrackingStrategy().stream((String) streamPartition.getPartitionId()).messageHandler(this));
            this.superStream = streamPartition.getStream();
        }
        removeOldAssignments(set);
    }

    private void filterBufferAndResetBackgroundFetch(Set<StreamPartition<String>> set) {
        stopBackgroundFetch();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(this.recordBufferSize);
        Stream filter = this.queue.stream().filter(orderedPartitionableRecord -> {
            return !this.streamBuilders.containsKey(orderedPartitionableRecord.getPartitionId());
        });
        Objects.requireNonNull(linkedBlockingQueue);
        filter.forEachOrdered((v1) -> {
            r1.offer(v1);
        });
        this.queue = linkedBlockingQueue;
    }

    public void seek(StreamPartition<String> streamPartition, Long l) {
        filterBufferAndResetBackgroundFetch(ImmutableSet.of(streamPartition));
        this.offsetMap.put((String) streamPartition.getPartitionId(), OffsetSpecification.offset(l.longValue()));
    }

    public void seekToEarliest(Set<StreamPartition<String>> set) {
        filterBufferAndResetBackgroundFetch(set);
        Iterator<StreamPartition<String>> it = set.iterator();
        while (it.hasNext()) {
            this.offsetMap.put((String) it.next().getPartitionId(), OffsetSpecification.first());
        }
    }

    public void seekToLatest(Set<StreamPartition<String>> set) {
        filterBufferAndResetBackgroundFetch(set);
        Iterator<StreamPartition<String>> it = set.iterator();
        while (it.hasNext()) {
            this.offsetMap.put((String) it.next().getPartitionId(), OffsetSpecification.last());
        }
    }

    /* renamed from: getAssignment, reason: merged with bridge method [inline-methods] */
    public Set<StreamPartition<String>> m10getAssignment() {
        return (Set) this.streamBuilders.keySet().stream().map(str -> {
            return StreamPartition.of(this.superStream, str);
        }).collect(Collectors.toSet());
    }

    public void handle(MessageHandler.Context context, Message message) {
        try {
            if (this.queue.offer(new OrderedPartitionableRecord<>(this.superStream, context.stream(), Long.valueOf(context.offset()), ImmutableList.of(new ByteEntity(message.getBodyAsBinary()))), this.recordBufferOfferTimeout, TimeUnit.MILLISECONDS)) {
                this.offsetMap.put(context.stream(), OffsetSpecification.offset(context.offset() + 1));
            } else {
                log.warn("Message not accepted, message buffer full", new Object[0]);
                stopBackgroundFetch();
            }
        } catch (InterruptedException e) {
            log.warn(e, "Interrupted while waiting to add record to buffer", new Object[0]);
            stopBackgroundFetch();
        }
    }

    public void optionalStartBackgroundFetch() {
        if (this.queue.size() < Math.min(this.maxRecordsPerPoll * 2, this.recordBufferSize / 2)) {
            startBackgroundFetch();
        }
    }

    @Nonnull
    public List<OrderedPartitionableRecord<String, Long, ByteEntity>> poll(long j) {
        optionalStartBackgroundFetch();
        try {
            int min = Math.min(Math.max(this.queue.size(), 1), this.maxRecordsPerPoll);
            ArrayList arrayList = new ArrayList(min);
            Queues.drain(this.queue, arrayList, min, j, TimeUnit.MILLISECONDS);
            return (List) arrayList.stream().filter(orderedPartitionableRecord -> {
                return this.streamBuilders.containsKey(orderedPartitionableRecord.getPartitionId());
            }).collect(Collectors.toList());
        } catch (InterruptedException e) {
            log.warn(e, "Interrupted while polling", new Object[0]);
            return Collections.emptyList();
        }
    }

    public Long getEarliestSequenceNumber(StreamPartition<String> streamPartition) {
        return Long.valueOf(getRabbitEnvironment().queryStreamStats((String) streamPartition.getPartitionId()).firstOffset());
    }

    public Long getLatestSequenceNumber(StreamPartition<String> streamPartition) {
        return Long.valueOf(getRabbitEnvironment().queryStreamStats((String) streamPartition.getPartitionId()).committedChunkId());
    }

    public boolean isOffsetAvailable(StreamPartition<String> streamPartition, OrderedSequenceNumber<Long> orderedSequenceNumber) {
        Long earliestSequenceNumber = getEarliestSequenceNumber(streamPartition);
        return earliestSequenceNumber != null && orderedSequenceNumber.isAvailableWithEarliest(RabbitSequenceNumber.of(earliestSequenceNumber));
    }

    public Long getPosition(StreamPartition<String> streamPartition) {
        throw new UnsupportedOperationException("getPosition() is not supported in RabbitMQ streams");
    }

    public Client.ClientParameters getParameters() {
        return new Client.ClientParameters();
    }

    public Client getClient(Client.ClientParameters clientParameters) {
        return new Client(clientParameters);
    }

    public Set<String> getPartitionIds(String str) {
        Client.ClientParameters parameters = getParameters();
        try {
            URI uri = new URI(this.uri);
            parameters.host(uri.getHost());
            if (uri.getPort() != -1) {
                parameters.port(uri.getPort());
            }
            if (this.password != null) {
                parameters.password(this.password);
            }
            if (this.username != null) {
                parameters.username(this.username);
            }
            Client client = getClient(parameters);
            List partitions = client.partitions(str);
            client.close();
            return new HashSet(partitions);
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException("error on uri" + this.uri);
        } catch (Exception e2) {
            throw e2;
        }
    }

    public void close() {
        if (this.closed) {
            return;
        }
        stopBackgroundFetch();
        this.closed = true;
    }

    /* renamed from: getPosition, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ Object m7getPosition(StreamPartition streamPartition) {
        return getPosition((StreamPartition<String>) streamPartition);
    }

    /* renamed from: getEarliestSequenceNumber, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ Object m8getEarliestSequenceNumber(StreamPartition streamPartition) {
        return getEarliestSequenceNumber((StreamPartition<String>) streamPartition);
    }

    /* renamed from: getLatestSequenceNumber, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ Object m9getLatestSequenceNumber(StreamPartition streamPartition) {
        return getLatestSequenceNumber((StreamPartition<String>) streamPartition);
    }

    public /* bridge */ /* synthetic */ void seek(StreamPartition streamPartition, Object obj) throws InterruptedException {
        seek((StreamPartition<String>) streamPartition, (Long) obj);
    }
}
