package dev.responsive.kafka.internal.stores;

import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.stores.ResponsiveSessionParams;
import dev.responsive.kafka.internal.config.InternalSessionConfigs;
import dev.responsive.kafka.internal.db.BatchFlusher;
import dev.responsive.kafka.internal.db.RemoteSessionTable;
import dev.responsive.kafka.internal.db.SessionFlushManager;
import dev.responsive.kafka.internal.db.SessionKeySpec;
import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient;
import dev.responsive.kafka.internal.db.partitioning.SessionSegmentPartitioner;
import dev.responsive.kafka.internal.metrics.ResponsiveRestoreListener;
import dev.responsive.kafka.internal.utils.Iterators;
import dev.responsive.kafka.internal.utils.Result;
import dev.responsive.kafka.internal.utils.SessionClients;
import dev.responsive.kafka.internal.utils.SessionKey;
import dev.responsive.kafka.internal.utils.StoreUtil;
import dev.responsive.kafka.internal.utils.TableName;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/internal/stores/SessionOperationsImpl.class */
public class SessionOperationsImpl implements SessionOperations {
    private final InternalProcessorContext context;
    private final ResponsiveSessionParams params;
    private final RemoteSessionTable<?> table;
    private final CommitBuffer<SessionKey, ?> buffer;
    private final TopicPartition changelog;
    private final ResponsiveStoreRegistry storeRegistry;
    private final ResponsiveStoreRegistration registration;
    private final ResponsiveRestoreListener restoreListener;
    private final long initialStreamTime;

    public static SessionOperationsImpl create(TableName tableName, StateStoreContext stateStoreContext, ResponsiveSessionParams responsiveSessionParams, Map<String, Object> map, ResponsiveConfig responsiveConfig, Predicate<SessionKey> predicate) throws InterruptedException, TimeoutException {
        Logger logger = new LogContext(String.format("session-store [%s] ", tableName.kafkaName())).logger(SessionOperationsImpl.class);
        InternalProcessorContext asInternalProcessorContext = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
        SessionClients loadSessionClients = InternalSessionConfigs.loadSessionClients(map);
        ResponsiveStoreRegistry loadStoreRegistry = InternalSessionConfigs.loadStoreRegistry(map);
        TopicPartition topicPartition = new TopicPartition(ProcessorContextUtils.changelogFor(stateStoreContext, tableName.kafkaName(), false), asInternalProcessorContext.taskId().partition());
        RemoteSessionTable<?> createRemoteSessionTable = createRemoteSessionTable(responsiveSessionParams, loadSessionClients, new SessionSegmentPartitioner(responsiveSessionParams.retentionPeriod(), StoreUtil.computeSegmentInterval(responsiveSessionParams.retentionPeriod(), responsiveSessionParams.numSegments())));
        SessionFlushManager init = createRemoteSessionTable.init(topicPartition.partition());
        logger.info("Remote table {} is available for querying.", tableName.tableName());
        SessionKeySpec sessionKeySpec = new SessionKeySpec(predicate);
        CommitBuffer from = CommitBuffer.from(new BatchFlusher(sessionKeySpec, topicPartition.partition(), init), loadSessionClients, topicPartition, sessionKeySpec, responsiveSessionParams.name(), responsiveConfig);
        long fetchOffset = createRemoteSessionTable.fetchOffset(topicPartition.partition());
        String kafkaName = tableName.kafkaName();
        OptionalLong empty = fetchOffset == -1 ? OptionalLong.empty() : OptionalLong.of(fetchOffset);
        Objects.requireNonNull(from);
        ResponsiveStoreRegistration responsiveStoreRegistration = new ResponsiveStoreRegistration(kafkaName, topicPartition, empty, (v1) -> {
            r5.flush(v1);
        }, StoreUtil.streamThreadId());
        loadStoreRegistry.registerStore(responsiveStoreRegistration);
        return new SessionOperationsImpl(asInternalProcessorContext, responsiveSessionParams, createRemoteSessionTable, from, topicPartition, loadStoreRegistry, responsiveStoreRegistration, loadSessionClients.restoreListener(), init.streamTime());
    }

    private static RemoteSessionTable<?> createCassandra(ResponsiveSessionParams responsiveSessionParams, SessionClients sessionClients, SessionSegmentPartitioner sessionSegmentPartitioner) throws InterruptedException, TimeoutException {
        throw new UnsupportedOperationException("Only responsive.storage.backend=MONGO_DB currently supports session windows.");
    }

    private static RemoteSessionTable<?> createMongo(ResponsiveSessionParams responsiveSessionParams, SessionClients sessionClients, SessionSegmentPartitioner sessionSegmentPartitioner) throws InterruptedException, TimeoutException {
        ResponsiveMongoClient mongoClient = sessionClients.mongoClient();
        switch (responsiveSessionParams.schemaType()) {
            case SESSION:
                return mongoClient.sessionTable(responsiveSessionParams.name().tableName(), sessionSegmentPartitioner);
            default:
                throw new IllegalArgumentException(responsiveSessionParams.schemaType().name());
        }
    }

    private static RemoteSessionTable<?> createRemoteSessionTable(ResponsiveSessionParams responsiveSessionParams, SessionClients sessionClients, SessionSegmentPartitioner sessionSegmentPartitioner) throws InterruptedException, TimeoutException {
        switch (sessionClients.storageBackend()) {
            case CASSANDRA:
                return createCassandra(responsiveSessionParams, sessionClients, sessionSegmentPartitioner);
            case MONGO_DB:
                return createMongo(responsiveSessionParams, sessionClients, sessionSegmentPartitioner);
            default:
                throw new IllegalStateException("Unexpected value: " + sessionClients.storageBackend());
        }
    }

    public SessionOperationsImpl(InternalProcessorContext internalProcessorContext, ResponsiveSessionParams responsiveSessionParams, RemoteSessionTable remoteSessionTable, CommitBuffer<SessionKey, ?> commitBuffer, TopicPartition topicPartition, ResponsiveStoreRegistry responsiveStoreRegistry, ResponsiveStoreRegistration responsiveStoreRegistration, ResponsiveRestoreListener responsiveRestoreListener, long j) {
        this.context = internalProcessorContext;
        this.params = responsiveSessionParams;
        this.table = remoteSessionTable;
        this.buffer = commitBuffer;
        this.changelog = topicPartition;
        this.storeRegistry = responsiveStoreRegistry;
        this.registration = responsiveStoreRegistration;
        this.restoreListener = responsiveRestoreListener;
        this.initialStreamTime = j;
    }

    @Override // dev.responsive.kafka.internal.stores.SessionOperations
    public long initialStreamTime() {
        return this.initialStreamTime;
    }

    @Override // dev.responsive.kafka.internal.stores.SessionOperations
    public void put(SessionKey sessionKey, byte[] bArr) {
        this.buffer.put(sessionKey, bArr, this.context.timestamp());
    }

    @Override // dev.responsive.kafka.internal.stores.SessionOperations
    public void delete(SessionKey sessionKey) {
        this.buffer.tombstone(sessionKey, this.context.timestamp());
    }

    @Override // dev.responsive.kafka.internal.stores.SessionOperations
    public byte[] fetch(SessionKey sessionKey) {
        Result<SessionKey> result = this.buffer.get(sessionKey);
        if (result == null) {
            return this.table.fetch(this.changelog.partition(), sessionKey.key, sessionKey.sessionStartMs, sessionKey.sessionEndMs);
        }
        if (result.isTombstone) {
            return null;
        }
        return result.value;
    }

    @Override // dev.responsive.kafka.internal.stores.SessionOperations
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(Bytes bytes, long j, long j2) {
        return Iterators.filterKv(Iterators.mapKeys(new LocalRemoteKvIterator(this.buffer.all(result -> {
            return ((SessionKey) result.key).key.equals(bytes) && ((SessionKey) result.key).sessionEndMs >= j && ((SessionKey) result.key).sessionEndMs <= j2;
        }), this.table.fetchAll(this.changelog.partition(), bytes, j, j2)), sessionKey -> {
            return new Windowed(sessionKey.key, new SessionWindow(sessionKey.sessionStartMs, sessionKey.sessionEndMs));
        }), windowed -> {
            return windowed.window().end() >= j && windowed.window().end() <= j2;
        });
    }

    @Override // dev.responsive.kafka.internal.stores.SessionOperations, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.buffer.close();
        this.restoreListener.onStoreClosed(this.changelog, this.params.name().kafkaName());
        this.storeRegistry.deregisterStore(this.registration);
    }

    public void restoreBatch(Collection<ConsumerRecord<byte[], byte[]>> collection) {
        this.buffer.restoreBatch(collection);
    }
}
