/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.Segment;
import org.apache.kafka.streams.state.internals.SegmentIterator;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.streams.state.internals.Segments;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RocksDBSegmentedBytesStore
implements SegmentedBytesStore {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBSegmentedBytesStore.class);
    private final String name;
    private final Segments segments;
    private final String metricScope;
    private final SegmentedBytesStore.KeySchema keySchema;
    private InternalProcessorContext context;
    private volatile boolean open;
    private Set<Segment> bulkLoadSegments;
    private Sensor expiredRecordSensor;

    RocksDBSegmentedBytesStore(String name, String metricScope, long retention, long segmentInterval, SegmentedBytesStore.KeySchema keySchema) {
        this.name = name;
        this.metricScope = metricScope;
        this.keySchema = keySchema;
        this.segments = new Segments(name, retention, segmentInterval);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> fetch(Bytes key, long from, long to) {
        List<Segment> searchSpace = this.keySchema.segmentsToSearch(this.segments, from, to);
        Bytes binaryFrom = this.keySchema.lowerRangeFixedSize(key, from);
        Bytes binaryTo = this.keySchema.upperRangeFixedSize(key, to);
        return new SegmentIterator(searchSpace.iterator(), this.keySchema.hasNextCondition(key, key, from, to), binaryFrom, binaryTo);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> fetch(Bytes keyFrom, Bytes keyTo, long from, long to) {
        List<Segment> searchSpace = this.keySchema.segmentsToSearch(this.segments, from, to);
        Bytes binaryFrom = this.keySchema.lowerRange(keyFrom, from);
        Bytes binaryTo = this.keySchema.upperRange(keyTo, to);
        return new SegmentIterator(searchSpace.iterator(), this.keySchema.hasNextCondition(keyFrom, keyTo, from, to), binaryFrom, binaryTo);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> all() {
        List<Segment> searchSpace = this.segments.allSegments();
        return new SegmentIterator(searchSpace.iterator(), this.keySchema.hasNextCondition(null, null, 0L, Long.MAX_VALUE), null, null);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> fetchAll(long timeFrom, long timeTo) {
        List<Segment> searchSpace = this.segments.segments(timeFrom, timeTo);
        return new SegmentIterator(searchSpace.iterator(), this.keySchema.hasNextCondition(null, null, timeFrom, timeTo), null, null);
    }

    @Override
    public void remove(Bytes key) {
        Segment segment = this.segments.getSegmentForTimestamp(this.keySchema.segmentTimestamp(key));
        if (segment == null) {
            return;
        }
        segment.delete(key);
    }

    @Override
    public void put(Bytes key, byte[] value) {
        long timestamp = this.keySchema.segmentTimestamp(key);
        long segmentId = this.segments.segmentId(timestamp);
        Segment segment = this.segments.getOrCreateSegmentIfLive(segmentId, this.context);
        if (segment == null) {
            this.expiredRecordSensor.record();
            LOG.debug("Skipping record for expired segment.");
        } else {
            segment.put(key, value);
        }
    }

    @Override
    public byte[] get(Bytes key) {
        Segment segment = this.segments.getSegmentForTimestamp(this.keySchema.segmentTimestamp(key));
        if (segment == null) {
            return null;
        }
        return segment.get(key);
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        this.context = (InternalProcessorContext)context;
        StreamsMetricsImpl metrics = this.context.metrics();
        String taskName = context.taskId().toString();
        this.expiredRecordSensor = metrics.storeLevelSensor(taskName, this.name(), "expired-window-record-drop", Sensor.RecordingLevel.INFO, new Sensor[0]);
        StreamsMetricsImpl.addInvocationRateAndCount(this.expiredRecordSensor, "stream-" + this.metricScope + "-metrics", metrics.tagMap("task-id", taskName, this.metricScope + "-id", this.name()), "expired-window-record-drop");
        this.keySchema.init(ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name()));
        this.segments.openExisting(this.context);
        this.bulkLoadSegments = new HashSet<Segment>(this.segments.allSegments());
        context.register(root, new RocksDBSegmentsBatchingRestoreCallback());
        this.open = true;
    }

    @Override
    public void flush() {
        this.segments.flush();
    }

    @Override
    public void close() {
        this.open = false;
        this.segments.close();
    }

    @Override
    public boolean persistent() {
        return true;
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    List<Segment> getSegments() {
        return this.segments.allSegments();
    }

    void restoreAllInternal(Collection<KeyValue<byte[], byte[]>> records) {
        try {
            Map<Segment, WriteBatch> writeBatchMap = this.getWriteBatches(records);
            for (Map.Entry<Segment, WriteBatch> entry : writeBatchMap.entrySet()) {
                Segment segment = entry.getKey();
                WriteBatch batch = entry.getValue();
                segment.write(batch);
            }
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
        }
    }

    Map<Segment, WriteBatch> getWriteBatches(Collection<KeyValue<byte[], byte[]>> records) {
        HashMap<Segment, WriteBatch> writeBatchMap = new HashMap<Segment, WriteBatch>();
        for (KeyValue<byte[], byte[]> record : records) {
            long segmentId = this.segments.segmentId(this.keySchema.segmentTimestamp(Bytes.wrap((byte[])((byte[])record.key))));
            Segment segment = this.segments.getOrCreateSegmentIfLive(segmentId, this.context);
            if (segment == null) continue;
            if (!this.bulkLoadSegments.contains(segment)) {
                segment.toggleDbForBulkLoading(true);
                this.bulkLoadSegments = new HashSet<Segment>(this.segments.allSegments());
            }
            try {
                WriteBatch batch = writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch());
                if (record.value == null) {
                    batch.delete((byte[])record.key);
                    continue;
                }
                batch.put((byte[])record.key, (byte[])record.value);
            }
            catch (RocksDBException e) {
                throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
            }
        }
        return writeBatchMap;
    }

    private void toggleForBulkLoading(boolean prepareForBulkload) {
        for (Segment segment : this.segments.allSegments()) {
            segment.toggleDbForBulkLoading(prepareForBulkload);
        }
    }

    private class RocksDBSegmentsBatchingRestoreCallback
    extends AbstractNotifyingBatchingRestoreCallback {
        private RocksDBSegmentsBatchingRestoreCallback() {
        }

        @Override
        public void restoreAll(Collection<KeyValue<byte[], byte[]>> records) {
            RocksDBSegmentedBytesStore.this.restoreAllInternal(records);
        }

        @Override
        public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {
            RocksDBSegmentedBytesStore.this.toggleForBulkLoading(true);
        }

        @Override
        public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {
            RocksDBSegmentedBytesStore.this.toggleForBulkLoading(false);
        }
    }
}

