package org.apache.flink.statefun.flink.core.functions;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Consumer;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.statefun.flink.core.di.Inject;
import org.apache.flink.statefun.flink.core.di.Label;
import org.apache.flink.statefun.flink.core.message.Message;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/functions/FlinkStateDelayedMessagesBuffer.class */
final class FlinkStateDelayedMessagesBuffer implements DelayedMessagesBuffer {
    static final String BUFFER_STATE_NAME = "delayed-messages-buffer";
    static final String INDEX_STATE_NAME = "delayed-message-index";
    private final InternalListState<String, Long, Message> bufferState;
    private final MapState<String, Long> cancellationTokenToTimestamp;

    @Inject
    FlinkStateDelayedMessagesBuffer(@Label("delayed-messages-buffer-state") InternalListState<String, Long, Message> internalListState, @Label("delayed-message-index") MapState<String, Long> mapState) {
        this.bufferState = (InternalListState) Objects.requireNonNull(internalListState);
        this.cancellationTokenToTimestamp = (MapState) Objects.requireNonNull(mapState);
    }

    @Override // org.apache.flink.statefun.flink.core.functions.DelayedMessagesBuffer
    public void forEachMessageAt(long j, Consumer<Message> consumer) {
        try {
            forEachMessageThrows(j, consumer);
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    @Override // org.apache.flink.statefun.flink.core.functions.DelayedMessagesBuffer
    public OptionalLong removeMessageByCancellationToken(String str) {
        try {
            return remove(str);
        } catch (Exception e) {
            throw new IllegalStateException("Failed clearing a message with a cancellation token " + str, e);
        }
    }

    @Override // org.apache.flink.statefun.flink.core.functions.DelayedMessagesBuffer
    public void add(Message message, long j) {
        try {
            addThrows(message, j);
        } catch (Exception e) {
            throw new RuntimeException("Error adding delayed message to state buffer: " + message, e);
        }
    }

    private void forEachMessageThrows(long j, Consumer<Message> consumer) throws Exception {
        this.bufferState.setCurrentNamespace(Long.valueOf(j));
        for (Message message : (Iterable) this.bufferState.get()) {
            removeMessageIdMapping(message);
            consumer.accept(message);
        }
        this.bufferState.clear();
    }

    private void addThrows(Message message, long j) throws Exception {
        this.bufferState.setCurrentNamespace(Long.valueOf(j));
        this.bufferState.add(message);
        Optional<String> cancellationToken = message.cancellationToken();
        if (cancellationToken.isPresent()) {
            String str = cancellationToken.get();
            Long l = (Long) this.cancellationTokenToTimestamp.get(str);
            if (l != null) {
                throw new IllegalStateException("Trying to associate a message with cancellation token " + str + " and timestamp " + j + ", but a message with the same cancellation token exists and with a timestamp " + l);
            }
            this.cancellationTokenToTimestamp.put(str, Long.valueOf(j));
        }
    }

    private OptionalLong remove(String str) throws Exception {
        Long l = (Long) this.cancellationTokenToTimestamp.get(str);
        if (l == null) {
            return OptionalLong.empty();
        }
        this.cancellationTokenToTimestamp.remove(str);
        this.bufferState.setCurrentNamespace(l);
        List<Message> removeMessageByToken = removeMessageByToken((Iterable) this.bufferState.get(), str);
        if (removeMessageByToken.isEmpty()) {
            this.bufferState.clear();
            return OptionalLong.of(l.longValue());
        }
        this.bufferState.update(removeMessageByToken);
        return OptionalLong.empty();
    }

    private void removeMessageIdMapping(Message message) throws Exception {
        Optional<String> cancellationToken = message.cancellationToken();
        if (cancellationToken.isPresent()) {
            this.cancellationTokenToTimestamp.remove(cancellationToken.get());
        }
    }

    private static List<Message> removeMessageByToken(Iterable<Message> iterable, String str) {
        ArrayList arrayList = new ArrayList();
        for (Message message : iterable) {
            Optional<String> cancellationToken = message.cancellationToken();
            if (!cancellationToken.isPresent() || !Objects.equals(cancellationToken.get(), str)) {
                arrayList.add(message);
            }
        }
        return arrayList;
    }
}
