package org.apache.flink.statefun.flink.core.functions;

import java.util.Objects;
import java.util.OptionalLong;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.statefun.flink.core.di.Inject;
import org.apache.flink.statefun.flink.core.di.Label;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/functions/DelaySink.class */
final class DelaySink implements Triggerable<String, VoidNamespace> {
    private final InternalTimerService<VoidNamespace> delayedMessagesTimerService;
    private final DelayedMessagesBuffer delayedMessagesBuffer;
    private final DelayMessageHandler delayMessageHandler;

    @Inject
    DelaySink(@Label("delayed-messages-buffer") DelayedMessagesBuffer delayedMessagesBuffer, @Label("delayed-messages-timer-service-factory") TimerServiceFactory timerServiceFactory, DelayMessageHandler delayMessageHandler) {
        this.delayedMessagesBuffer = (DelayedMessagesBuffer) Objects.requireNonNull(delayedMessagesBuffer);
        this.delayedMessagesTimerService = timerServiceFactory.createTimerService(this);
        this.delayMessageHandler = (DelayMessageHandler) Objects.requireNonNull(delayMessageHandler);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void accept(Message message, long j) {
        Objects.requireNonNull(message);
        Preconditions.checkArgument(j >= 0);
        long currentProcessingTime = this.delayedMessagesTimerService.currentProcessingTime() + j;
        this.delayedMessagesTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, currentProcessingTime);
        this.delayedMessagesBuffer.add(message, currentProcessingTime);
    }

    public void onProcessingTime(InternalTimer<String, VoidNamespace> internalTimer) {
        this.delayMessageHandler.onStart();
        this.delayedMessagesBuffer.forEachMessageAt(internalTimer.getTimestamp(), this.delayMessageHandler);
        this.delayMessageHandler.onComplete();
    }

    public void onEventTime(InternalTimer<String, VoidNamespace> internalTimer) {
        throw new UnsupportedOperationException("Delayed messages with event time semantics is not supported.");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeMessageByCancellationToken(String str) {
        Objects.requireNonNull(str);
        OptionalLong removeMessageByCancellationToken = this.delayedMessagesBuffer.removeMessageByCancellationToken(str);
        if (removeMessageByCancellationToken.isPresent()) {
            this.delayedMessagesTimerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, removeMessageByCancellationToken.getAsLong());
        }
    }
}
