/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.stream;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.ReadOffsetStrategy;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Task;
import org.springframework.util.ErrorHandler;

class StreamPollTask<K, V extends Record<K, ?>>
implements Task {
    private final StreamMessageListenerContainer.StreamReadRequest<K> request;
    private final StreamListener<K, V> listener;
    private final ErrorHandler errorHandler;
    private final Predicate<Throwable> cancelSubscriptionOnError;
    private final BiFunction<K, ReadOffset, List<V>> readFunction;
    private final PollState pollState;
    private volatile boolean isInEventLoop = false;

    StreamPollTask(StreamMessageListenerContainer.StreamReadRequest<K> streamRequest, StreamListener<K, V> listener, ErrorHandler errorHandler, BiFunction<K, ReadOffset, List<V>> readFunction) {
        this.request = streamRequest;
        this.listener = listener;
        this.errorHandler = Optional.ofNullable(streamRequest.getErrorHandler()).orElse(errorHandler);
        this.cancelSubscriptionOnError = streamRequest.getCancelSubscriptionOnError();
        this.readFunction = readFunction;
        this.pollState = StreamPollTask.createPollState(streamRequest);
    }

    private static PollState createPollState(StreamMessageListenerContainer.StreamReadRequest<?> streamRequest) {
        StreamOffset<?> streamOffset = streamRequest.getStreamOffset();
        if (streamRequest instanceof StreamMessageListenerContainer.ConsumerStreamReadRequest) {
            return PollState.consumer(((StreamMessageListenerContainer.ConsumerStreamReadRequest)streamRequest).getConsumer(), streamOffset.getOffset());
        }
        return PollState.standalone(streamOffset.getOffset());
    }

    @Override
    public void cancel() throws DataAccessResourceFailureException {
        this.pollState.cancel();
    }

    @Override
    public Task.State getState() {
        return this.pollState.getState();
    }

    @Override
    public boolean awaitStart(Duration timeout) throws InterruptedException {
        return this.pollState.awaitStart(timeout.toNanos(), TimeUnit.NANOSECONDS);
    }

    @Override
    public boolean isLongLived() {
        return true;
    }

    @Override
    public void run() {
        this.pollState.starting();
        try {
            this.isInEventLoop = true;
            this.pollState.running();
            this.doLoop(this.request.getStreamOffset().getKey());
        }
        finally {
            this.isInEventLoop = false;
        }
    }

    private void doLoop(K key) {
        do {
            try {
                Thread.sleep(0L);
                List<V> read = this.readFunction.apply(key, this.pollState.getCurrentReadOffset());
                for (Record message : read) {
                    this.listener.onMessage(message);
                    this.pollState.updateReadOffset(message.getId().getValue());
                }
            }
            catch (InterruptedException e) {
                this.cancel();
                Thread.currentThread().interrupt();
            }
            catch (RuntimeException e) {
                this.errorHandler.handleError(e);
                if (!this.cancelSubscriptionOnError.test(e)) continue;
                this.cancel();
            }
        } while (this.pollState.isSubscriptionActive());
    }

    @Override
    public boolean isActive() {
        return Task.State.RUNNING.equals((Object)this.getState()) || this.isInEventLoop;
    }

    static class PollState {
        private final ReadOffsetStrategy readOffsetStrategy;
        private final Optional<Consumer> consumer;
        private volatile ReadOffset currentOffset;
        private volatile Task.State state = Task.State.CREATED;
        private volatile CountDownLatch awaitStart = new CountDownLatch(1);

        private PollState(Optional<Consumer> consumer, ReadOffsetStrategy readOffsetStrategy, ReadOffset currentOffset) {
            this.readOffsetStrategy = readOffsetStrategy;
            this.currentOffset = currentOffset;
            this.consumer = consumer;
        }

        static PollState standalone(ReadOffset offset) {
            ReadOffsetStrategy strategy = ReadOffsetStrategy.getStrategy(offset);
            return new PollState(Optional.empty(), strategy, strategy.getFirst(offset, Optional.empty()));
        }

        static PollState consumer(Consumer consumer, ReadOffset offset) {
            ReadOffsetStrategy strategy = ReadOffsetStrategy.getStrategy(offset);
            Optional<Consumer> optionalConsumer = Optional.of(consumer);
            return new PollState(optionalConsumer, strategy, strategy.getFirst(offset, optionalConsumer));
        }

        boolean awaitStart(long timeout, TimeUnit unit) throws InterruptedException {
            return this.awaitStart.await(timeout, unit);
        }

        public Task.State getState() {
            return this.state;
        }

        boolean isSubscriptionActive() {
            return this.state == Task.State.STARTING || this.state == Task.State.RUNNING;
        }

        void starting() {
            this.state = Task.State.STARTING;
        }

        void running() {
            this.state = Task.State.RUNNING;
            CountDownLatch awaitStart = this.awaitStart;
            if (awaitStart.getCount() == 1L) {
                awaitStart.countDown();
            }
        }

        void cancel() {
            this.awaitStart = new CountDownLatch(1);
            this.state = Task.State.CANCELLED;
        }

        void updateReadOffset(String messageId) {
            this.currentOffset = this.readOffsetStrategy.getNext(this.getCurrentReadOffset(), this.consumer, messageId);
        }

        ReadOffset getCurrentReadOffset() {
            return this.currentOffset;
        }
    }
}

