package org.apache.flink.statefun.flink.core.functions;

import java.util.Objects;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.statefun.flink.core.di.Inject;
import org.apache.flink.statefun.flink.core.di.Label;
import org.apache.flink.statefun.flink.core.message.Message;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/functions/FlinkStateDelayedMessagesBuffer.class */
final class FlinkStateDelayedMessagesBuffer implements DelayedMessagesBuffer {
    static final String BUFFER_STATE_NAME = "delayed-messages-buffer";
    private final InternalListState<String, Long, Message> bufferState;

    @Inject
    FlinkStateDelayedMessagesBuffer(@Label("delayed-messages-buffer-state") InternalListState<String, Long, Message> internalListState) {
        this.bufferState = (InternalListState) Objects.requireNonNull(internalListState);
    }

    @Override // org.apache.flink.statefun.flink.core.functions.DelayedMessagesBuffer
    public void add(Message message, long j) {
        this.bufferState.setCurrentNamespace(Long.valueOf(j));
        try {
            this.bufferState.add(message);
        } catch (Exception e) {
            throw new RuntimeException("Error adding delayed message to state buffer: " + message, e);
        }
    }

    @Override // org.apache.flink.statefun.flink.core.functions.DelayedMessagesBuffer
    public Iterable<Message> getForTimestamp(long j) {
        this.bufferState.setCurrentNamespace(Long.valueOf(j));
        try {
            return (Iterable) this.bufferState.get();
        } catch (Exception e) {
            throw new RuntimeException("Error accessing delayed message in state buffer for timestamp: " + j, e);
        }
    }

    @Override // org.apache.flink.statefun.flink.core.functions.DelayedMessagesBuffer
    public void clearForTimestamp(long j) {
        this.bufferState.setCurrentNamespace(Long.valueOf(j));
        this.bufferState.clear();
    }
}
