package net.soundvibe.reacto.server.handlers;

import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Function;
import net.soundvibe.reacto.client.errors.CommandNotFound;
import net.soundvibe.reacto.internal.InternalEvent;
import net.soundvibe.reacto.mappers.Mappers;
import net.soundvibe.reacto.server.CommandRegistry;
import net.soundvibe.reacto.types.Command;
import net.soundvibe.reacto.types.Event;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.schedulers.Schedulers;

/* loaded from: input_file:net/soundvibe/reacto/server/handlers/CommandHandler.class */
public final class CommandHandler {
    private final CommandRegistry commands;
    private static final Logger log = LoggerFactory.getLogger(CommandHandler.class);
    private static final Scheduler SINGLE_THREAD = Schedulers.from(Executors.newSingleThreadExecutor());

    public CommandHandler(CommandRegistry commandRegistry) {
        this.commands = commandRegistry;
    }

    public void handle(byte[] bArr, Consumer<byte[]> consumer, Consumer<Subscription> consumer2, Runnable runnable) {
        try {
            Command fromBytesToCommand = Mappers.fromBytesToCommand(bArr);
            Optional<Function<Command, Observable<Event>>> findCommand = this.commands.findCommand(fromBytesToCommand.name);
            findCommand.map(function -> {
                return ((Observable) function.apply(fromBytesToCommand)).doOnEach(notification -> {
                    log.debug("Command " + fromBytesToCommand + " executed and received notification: " + notification);
                }).subscribeOn(SINGLE_THREAD).observeOn(SINGLE_THREAD).subscribe(event -> {
                    consumer.accept(toBytes(InternalEvent.onNext(event)));
                }, th -> {
                    consumer.accept(toBytes(InternalEvent.onError(th)));
                    runnable.run();
                }, () -> {
                    consumer.accept(toBytes(InternalEvent.onCompleted()));
                    runnable.run();
                });
            }).ifPresent(consumer2);
            if (!findCommand.isPresent()) {
                consumer.accept(toBytes(InternalEvent.onError(new CommandNotFound(fromBytesToCommand.name))));
                runnable.run();
            }
        } catch (Throwable th) {
            consumer.accept(toBytes(InternalEvent.onError(th)));
            runnable.run();
        }
    }

    private byte[] toBytes(InternalEvent internalEvent) {
        return Mappers.internalEventToBytes(internalEvent);
    }
}
