package io.apicurio.registry.streams.utils;

import io.apicurio.registry.storage.proto.Str;
import io.apicurio.registry.streams.diservice.AsyncBiFunctionService;
import io.apicurio.registry.utils.kafka.ProtoSerde;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

/* loaded from: input_file:io/apicurio/registry/streams/utils/WaitForDataService.class */
public class WaitForDataService implements AsyncBiFunctionService.WithSerdes<String, Long, Str.Data> {
    public static final String NAME = "WaitForDataService";
    private final Map<String, NavigableMap<Long, ResultCF>> waitingResults = new ConcurrentHashMap();
    private final ReadOnlyKeyValueStore<String, Str.Data> storageKeyValueStore;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/apicurio/registry/streams/utils/WaitForDataService$ResultCF.class */
    public static class ResultCF extends CompletableFuture<Str.Data> {
        final long offset;

        public ResultCF(long j) {
            this.offset = j;
        }
    }

    public WaitForDataService(ReadOnlyKeyValueStore<String, Str.Data> readOnlyKeyValueStore, ForeachActionDispatcher<String, Str.Data> foreachActionDispatcher) {
        this.storageKeyValueStore = (ReadOnlyKeyValueStore) Objects.requireNonNull(readOnlyKeyValueStore);
        foreachActionDispatcher.register(this::dataUpdated);
    }

    private void dataUpdated(String str, Str.Data data) {
        if (data != null && this.waitingResults.containsKey(str)) {
            this.waitingResults.compute(str, (str2, navigableMap) -> {
                if (navigableMap == null) {
                    return null;
                }
                Iterator it = navigableMap.headMap(Long.valueOf(data.getLastProcessedOffset()), true).entrySet().iterator();
                while (it.hasNext()) {
                    ((ResultCF) ((Map.Entry) it.next()).getValue()).complete(data);
                    it.remove();
                }
                if (navigableMap.isEmpty()) {
                    return null;
                }
                return navigableMap;
            });
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
    }

    @Override // io.apicurio.registry.streams.diservice.AsyncBiFunctionService.WithSerdes
    public Serde<String> keySerde() {
        return Serdes.String();
    }

    @Override // io.apicurio.registry.streams.diservice.AsyncBiFunctionService.WithSerdes
    public Serde<Long> reqSerde() {
        return Serdes.Long();
    }

    @Override // io.apicurio.registry.streams.diservice.AsyncBiFunctionService.WithSerdes
    public Serde<Str.Data> resSerde() {
        return ProtoSerde.parsedWith(Str.Data.parser());
    }

    @Override // java.util.function.BiFunction
    public CompletionStage<Str.Data> apply(String str, Long l) {
        ResultCF resultCF = new ResultCF(l.longValue());
        register(str, resultCF);
        try {
            dataUpdated(str, (Str.Data) this.storageKeyValueStore.get(str));
        } catch (Throwable th) {
            deregister(str, resultCF);
            resultCF.completeExceptionally(th);
        }
        return resultCF;
    }

    private void register(String str, ResultCF resultCF) {
        this.waitingResults.compute(str, (str2, navigableMap) -> {
            if (navigableMap == null) {
                navigableMap = new TreeMap();
            }
            navigableMap.put(Long.valueOf(resultCF.offset), resultCF);
            return navigableMap;
        });
    }

    private void deregister(String str, ResultCF resultCF) {
        this.waitingResults.compute(str, (str2, navigableMap) -> {
            if (navigableMap == null) {
                return null;
            }
            navigableMap.remove(Long.valueOf(resultCF.offset));
            if (navigableMap.isEmpty()) {
                return null;
            }
            return navigableMap;
        });
    }
}
