package org.aanguita.jacuzzi.service.ondemand;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.aanguita.jacuzzi.queues.processor.FinishReadingMessagesException;
import org.aanguita.jacuzzi.queues.processor.MessageHandler;
import org.aanguita.jacuzzi.queues.processor.MessageProcessor;
import org.aanguita.jacuzzi.queues.processor.MessageReader;

/* loaded from: input_file:org/aanguita/jacuzzi/service/ondemand/PassiveOnDemandService.class */
public class PassiveOnDemandService<T> extends AbstractOnDemandService<T> implements OnDemandService<T> {
    private MessageProcessor<T> messageProcessor;

    /* loaded from: input_file:org/aanguita/jacuzzi/service/ondemand/PassiveOnDemandService$Handler.class */
    private static class Handler<T> implements MessageHandler<T> {
        private final PassiveOnDemandService<T> onDemandService;

        private Handler(PassiveOnDemandService<T> passiveOnDemandService) {
            this.onDemandService = passiveOnDemandService;
        }

        @Override // org.aanguita.jacuzzi.queues.processor.MessageHandler
        public void handleMessage(T t) {
            this.onDemandService.event(t);
        }

        @Override // org.aanguita.jacuzzi.queues.processor.MessageHandler
        public void close() {
        }
    }

    /* loaded from: input_file:org/aanguita/jacuzzi/service/ondemand/PassiveOnDemandService$Reader.class */
    private static class Reader<T> implements MessageReader<T> {
        private final Supplier<T> supplier;
        private final AtomicBoolean alive = new AtomicBoolean(true);

        public Reader(Supplier<T> supplier) {
            this.supplier = supplier;
        }

        @Override // org.aanguita.jacuzzi.queues.processor.MessageReader
        public T readMessage() throws FinishReadingMessagesException {
            if (this.alive.get()) {
                return this.supplier.get();
            }
            throw new FinishReadingMessagesException();
        }

        @Override // org.aanguita.jacuzzi.queues.processor.MessageReader
        public void stop() {
            this.alive.set(false);
        }
    }

    public PassiveOnDemandService(Supplier<T> supplier) {
        super(supplier);
    }

    @Override // org.aanguita.jacuzzi.service.ondemand.AbstractOnDemandService
    void startService() {
        this.messageProcessor = new MessageProcessor<>((MessageReader) new Reader(this.eventSupplier), (MessageHandler) new Handler(), false);
        this.messageProcessor.start();
    }

    @Override // org.aanguita.jacuzzi.service.ondemand.AbstractOnDemandService
    void stopService() {
        this.messageProcessor.stop();
    }
}
