package net.openhft.chronicle.engine.api.query;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.core.threads.InvalidEventHandlerException;
import net.openhft.chronicle.engine.api.pubsub.ConsumingSubscriber;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.tree.ChronicleQueueView;
import net.openhft.chronicle.engine.tree.QueueView;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.Marshallable;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.Wires;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/openhft/chronicle/engine/api/query/VanillaIndexQueueView.class */
public class VanillaIndexQueueView<V extends Marshallable> implements IndexQueueView<ConsumingSubscriber<IndexedValue<V>>, V> {
    private static final Logger LOG = LoggerFactory.getLogger(VanillaIndexQueueView.class);
    private final Function<V, ?> valueToKey;
    private final ChronicleQueue chronicleQueue;
    private final ThreadLocal<Function<Class, Marshallable>> objectCacheThreadLocal;
    private final TypeToString typeToString;
    private final Map<String, Map<Object, IndexedValue<V>>> multiMap = new ConcurrentHashMap();
    private final Map<Subscriber<IndexedValue<V>>, AtomicBoolean> activeSubscriptions = new ConcurrentHashMap();
    private final AtomicBoolean isClosed = new AtomicBoolean();
    private final Object lock = new Object();
    private final ThreadLocal<IndexedValue<V>> indexedValue = ThreadLocal.withInitial(IndexedValue::new);
    private volatile long lastIndexRead = 0;
    private long lastSecond = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
    private long messagesReadPerSecond = 0;
    private ThreadLocal<List<Marshallable>> indexedValueList = ThreadLocal.withInitial(ArrayList::new);

    public VanillaIndexQueueView(@NotNull RequestContext requestContext, @NotNull Asset asset, @NotNull QueueView<?, V> queueView) {
        this.valueToKey = (Function) asset.acquireView(ValueToKey.class);
        EventLoop eventLoop = (EventLoop) asset.acquireView(EventLoop.class);
        this.chronicleQueue = ((ChronicleQueueView) queueView).chronicleQueue();
        ExcerptTailer createTailer = this.chronicleQueue.createTailer();
        this.typeToString = (TypeToString) asset.root().acquireView(TypeToString.class);
        this.objectCacheThreadLocal = ThreadLocal.withInitial(() -> {
            return ((ObjectCacheFactory) asset.root().acquireView(ObjectCacheFactory.class)).get();
        });
        eventLoop.addHandler(() -> {
            long seconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
            if (seconds >= this.lastSecond + 10) {
                this.lastSecond = seconds;
                LOG.info("messages read per second=" + (this.messagesReadPerSecond / 10));
                this.messagesReadPerSecond = 0L;
            }
            if (this.isClosed.get()) {
                throw new InvalidEventHandlerException();
            }
            DocumentContext readingDocument = createTailer.readingDocument();
            Throwable th = null;
            try {
                if (!readingDocument.isPresent()) {
                    return false;
                }
                long readPosition = readingDocument.wire().bytes().readPosition();
                while (readingDocument.wire().bytes().readRemaining() > 0) {
                    try {
                        StringBuilder acquireStringBuilder = Wires.acquireStringBuilder();
                        ValueIn read = readingDocument.wire().read(acquireStringBuilder);
                        Marshallable apply = VanillaObjectCacheFactory.INSTANCE.get().apply(this.typeToString.toType(acquireStringBuilder));
                        read.marshallable(apply);
                        Object apply2 = this.valueToKey.apply(apply);
                        this.messagesReadPerSecond++;
                        String sb = acquireStringBuilder.toString();
                        synchronized (this.lock) {
                            this.multiMap.computeIfAbsent(sb, str -> {
                                return new ConcurrentHashMap();
                            }).put(apply2, new IndexedValue<>(apply, readingDocument.index()));
                            this.lastIndexRead = readingDocument.index();
                        }
                    } catch (RuntimeException e) {
                        Jvm.warn().on(getClass(), Wires.fromSizePrefixedBlobs(readingDocument.wire().bytes(), readPosition - 4), e);
                    }
                }
                if (readingDocument == null) {
                    return true;
                }
                if (0 == 0) {
                    readingDocument.close();
                    return true;
                }
                try {
                    readingDocument.close();
                    return true;
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                    return true;
                }
            } finally {
                if (readingDocument != null) {
                    if (0 != 0) {
                        try {
                            readingDocument.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        readingDocument.close();
                    }
                }
            }
        });
    }

    @Override // net.openhft.chronicle.engine.api.query.IndexQueueView
    public void registerSubscriber(@NotNull ConsumingSubscriber<IndexedValue<V>> consumingSubscriber, @NotNull IndexQuery<V> indexQuery) {
        this.activeSubscriptions.put(consumingSubscriber, new AtomicBoolean());
        long fromIndex = indexQuery.fromIndex() == 0 ? this.lastIndexRead : indexQuery.fromIndex();
        String eventName = indexQuery.eventName();
        Predicate<V> filter = indexQuery.filter();
        Iterator<IndexedValue<V>> it = this.multiMap.computeIfAbsent(eventName, str -> {
            return new ConcurrentHashMap();
        }).values().stream().filter(indexedValue -> {
            return indexedValue.index() < fromIndex && filter.test(indexedValue.v());
        }).iterator();
        ExcerptTailer createTailer = this.chronicleQueue.createTailer();
        if (fromIndex != 0) {
            try {
                if (!createTailer.moveToIndex(fromIndex)) {
                    throw new IllegalStateException("Failed to move to index " + Long.toHexString(fromIndex));
                }
            } catch (RuntimeException e) {
                consumingSubscriber.onEndOfSubscription();
                Jvm.warn().on(getClass(), "Error registering subscription", e);
                return;
            }
        }
        consumingSubscriber.addSupplier(excerptConsumer(indexQuery, createTailer, it, fromIndex));
    }

    @NotNull
    private Supplier<List<Marshallable>> excerptConsumer(@NotNull IndexQuery<V> indexQuery, @NotNull ExcerptTailer excerptTailer, @NotNull Iterator<IndexedValue<V>> it, long j) {
        return () -> {
            return value(indexQuery, excerptTailer, it, j);
        };
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Nullable
    private List<Marshallable> value(@NotNull IndexQuery<V> indexQuery, @NotNull ExcerptTailer excerptTailer, @NotNull Iterator<IndexedValue<V>> it, long j) {
        List<Marshallable> list = this.indexedValueList.get();
        list.clear();
        if (it.hasNext()) {
            IndexedValue<V> next = it.next();
            next.timePublished(System.currentTimeMillis());
            next.maxIndex(this.lastIndexRead);
            list.add(next);
            return list;
        }
        String eventName = indexQuery.eventName();
        Predicate<V> filter = indexQuery.filter();
        if (this.isClosed.get()) {
            throw Jvm.rethrow(new InvalidEventHandlerException("shutdown"));
        }
        DocumentContext readingDocument = excerptTailer.readingDocument();
        Throwable th = null;
        try {
            try {
                if (!readingDocument.isPresent()) {
                    if (readingDocument != null) {
                        if (0 != 0) {
                            try {
                                readingDocument.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            readingDocument.close();
                        }
                    }
                    return null;
                }
                System.out.println(Wires.fromSizePrefixedBlobs(readingDocument));
                if (LOG.isDebugEnabled()) {
                    Jvm.debug().on(getClass(), "processing the following message=" + Wires.fromSizePrefixedBlobs(readingDocument));
                }
                if (j > readingDocument.index()) {
                    if (readingDocument != null) {
                        if (0 != 0) {
                            try {
                                readingDocument.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            readingDocument.close();
                        }
                    }
                    return null;
                }
                StringBuilder acquireStringBuilder = Wires.acquireStringBuilder();
                while (readingDocument.wire().bytes().readRemaining() > 0) {
                    ValueIn read = readingDocument.wire().read(acquireStringBuilder);
                    if (eventName.contentEquals(acquireStringBuilder)) {
                        this.objectCacheThreadLocal.get();
                        Marshallable apply = VanillaObjectCacheFactory.INSTANCE.get().apply(this.typeToString.toType(acquireStringBuilder));
                        read.marshallable(apply);
                        if (filter.test(apply)) {
                            IndexedValue<V> indexedValue = this.indexedValue.get();
                            indexedValue.index(readingDocument.index());
                            indexedValue.v(apply);
                            indexedValue.timePublished(System.currentTimeMillis());
                            indexedValue.maxIndex(this.lastIndexRead);
                            list.add(indexedValue);
                        }
                    } else {
                        read.marshallable(VanillaObjectCacheFactory.INSTANCE.get().apply(this.typeToString.toType(acquireStringBuilder)));
                    }
                }
                if (readingDocument != null) {
                    if (0 != 0) {
                        try {
                            readingDocument.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        readingDocument.close();
                    }
                }
                return list;
            } finally {
            }
        } catch (Throwable th5) {
            if (readingDocument != null) {
                if (th != null) {
                    try {
                        readingDocument.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    readingDocument.close();
                }
            }
            throw th5;
        }
    }

    @Override // net.openhft.chronicle.engine.api.query.IndexQueueView
    public void unregisterSubscriber(@NotNull ConsumingSubscriber<IndexedValue<V>> consumingSubscriber) {
        AtomicBoolean remove = this.activeSubscriptions.remove(consumingSubscriber);
        if (remove != null) {
            remove.set(true);
        }
    }

    public void close() {
        this.isClosed.set(true);
        this.activeSubscriptions.values().forEach(atomicBoolean -> {
            atomicBoolean.set(true);
        });
        this.chronicleQueue.close();
    }
}
