package net.soundvibe.reacto.discovery;

import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.servicediscovery.Record;
import io.vertx.servicediscovery.ServiceDiscovery;
import io.vertx.servicediscovery.Status;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import net.soundvibe.reacto.client.commands.CommandExecutor;
import net.soundvibe.reacto.client.commands.CommandExecutors;
import net.soundvibe.reacto.client.commands.Services;
import net.soundvibe.reacto.client.commands.VertxWebSocketCommandExecutor;
import net.soundvibe.reacto.client.errors.CannotDiscoverService;
import net.soundvibe.reacto.client.events.EventHandler;
import net.soundvibe.reacto.client.events.EventHandlers;
import net.soundvibe.reacto.client.events.VertxDiscoverableEventHandler;
import net.soundvibe.reacto.client.events.VertxWebSocketEventHandler;
import net.soundvibe.reacto.server.ServiceRecords;
import net.soundvibe.reacto.types.Pair;
import net.soundvibe.reacto.utils.Factories;
import rx.Observable;

/* loaded from: input_file:net/soundvibe/reacto/discovery/DiscoverableService.class */
public final class DiscoverableService {
    private static final Logger log = LoggerFactory.getLogger(DiscoverableService.class);
    private final AtomicBoolean isClosed = new AtomicBoolean(true);
    public final ServiceDiscovery serviceDiscovery;

    public DiscoverableService(ServiceDiscovery serviceDiscovery) {
        this.serviceDiscovery = serviceDiscovery;
    }

    public Observable<CommandExecutor> find(String str) {
        return CommandExecutors.find(Services.ofMain(str, this.serviceDiscovery));
    }

    public Observable<CommandExecutor> find(String str, Predicate<Record> predicate) {
        return CommandExecutors.find(Services.ofMain(str, this.serviceDiscovery), predicate);
    }

    public Observable<CommandExecutor> find(String str, LoadBalancer loadBalancer) {
        return CommandExecutors.find(Services.ofMain(str, this.serviceDiscovery), loadBalancer);
    }

    public Observable<CommandExecutor> find(String str, LoadBalancer loadBalancer, Predicate<Record> predicate) {
        return CommandExecutors.find(Services.ofMain(str, this.serviceDiscovery), loadBalancer, predicate);
    }

    public Observable<CommandExecutor> find(String str, String str2) {
        return CommandExecutors.find(Services.ofMainAndFallback(str, str2, this.serviceDiscovery));
    }

    public Observable<CommandExecutor> find(String str, String str2, Predicate<Record> predicate) {
        return CommandExecutors.find(Services.ofMainAndFallback(str, str2, this.serviceDiscovery), predicate);
    }

    public Observable<CommandExecutor> find(String str, String str2, LoadBalancer loadBalancer) {
        return CommandExecutors.find(Services.ofMainAndFallback(str, str2, this.serviceDiscovery), loadBalancer);
    }

    public Observable<CommandExecutor> find(String str, String str2, LoadBalancer loadBalancer, Predicate<Record> predicate) {
        return find(Services.ofMainAndFallback(str, str2, this.serviceDiscovery), loadBalancer, predicate);
    }

    public Observable<CommandExecutor> find(Services services, LoadBalancer loadBalancer, Predicate<Record> predicate) {
        return DiscoverableServices.find(services.mainServiceName, predicate, this.serviceDiscovery, loadBalancer).map(webSocketStream -> {
            return Pair.of(true, webSocketStream);
        }).onErrorResumeNext(th -> {
            return services.fallbackServiceName.isPresent() ? DiscoverableServices.find(services.fallbackServiceName.get(), predicate, services.serviceDiscovery, loadBalancer).map(webSocketStream2 -> {
                return Pair.of(false, webSocketStream2);
            }) : Observable.error(new CannotDiscoverService("Cannot find any of " + services, th));
        }).flatMap(pair -> {
            return Observable.just(pair.value).concatWith((((Boolean) pair.key).booleanValue() && services.fallbackServiceName.isPresent()) ? DiscoverableServices.find(services.fallbackServiceName.get(), predicate, services.serviceDiscovery, loadBalancer).onExceptionResumeNext(Observable.empty()) : Observable.empty());
        }).switchIfEmpty(Observable.error(new CannotDiscoverService("Unable to discover any of " + services))).map(webSocketStream2 -> {
            return new VertxDiscoverableEventHandler(webSocketStream2, VertxWebSocketEventHandler::observe);
        }).toList().filter(list -> {
            return Boolean.valueOf(!list.isEmpty());
        }).switchIfEmpty(Observable.error(new CannotDiscoverService("Unable to discover any of " + services))).map(list2 -> {
            return new EventHandlers((EventHandler) list2.get(0), list2.stream().skip(1L).findFirst().map(vertxDiscoverableEventHandler -> {
                return vertxDiscoverableEventHandler;
            }));
        }).map(eventHandlers -> {
            return new VertxWebSocketCommandExecutor(() -> {
                return Optional.ofNullable(eventHandlers);
            });
        });
    }

    public void startHeartBeat(Record record) {
        scheduleAtFixedInterval(TimeUnit.MINUTES.toMillis(1L), () -> {
            if (isOpen()) {
                publishRecord(record).subscribe(record2 -> {
                    log.info("Heartbeat published record: " + record2);
                }, th -> {
                    log.error("Error while trying to publish the record on heartbeat: " + th);
                }, () -> {
                    log.info("Heartbeat completed successfully");
                });
            } else {
                log.info("Skipping heartbeat because service discovery is closed");
            }
        }, "service-discovery-heartbeat");
    }

    private void scheduleAtFixedInterval(long j, final Runnable runnable, String str) {
        new Timer(str, true).scheduleAtFixedRate(new TimerTask() { // from class: net.soundvibe.reacto.discovery.DiscoverableService.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    runnable.run();
                } catch (Throwable th) {
                    DiscoverableService.log.error("Error while doing scheduled task: " + th);
                }
            }
        }, j, j);
    }

    public Observable<Record> publishRecord(Record record) {
        return Observable.just(record).flatMap(record2 -> {
            return removeIf(record2, (record2, record3) -> {
                return Boolean.valueOf(ServiceRecords.isDown(record2));
            });
        }).map(record3 -> {
            record3.getMetadata().put(ServiceRecords.LAST_UPDATED, Instant.now());
            return record3.setStatus(Status.UP);
        }).flatMap(record4 -> {
            return Observable.create(subscriber -> {
                if (record4.getRegistration() != null) {
                    this.serviceDiscovery.update(record, asyncResult -> {
                        if (asyncResult.succeeded()) {
                            log.info("Service has been updated successfully: " + ((Record) asyncResult.result()).toJson());
                            if (!subscriber.isUnsubscribed()) {
                                subscriber.onNext(asyncResult.result());
                                subscriber.onCompleted();
                            }
                        }
                        if (asyncResult.failed()) {
                            log.error("Error when trying to updated the service: " + asyncResult.cause(), asyncResult.cause());
                            if (subscriber.isUnsubscribed()) {
                                return;
                            }
                            subscriber.onError(asyncResult.cause());
                        }
                    });
                } else {
                    this.serviceDiscovery.publish(record4, asyncResult2 -> {
                        if (asyncResult2.succeeded()) {
                            log.info("Service has been published successfully: " + ((Record) asyncResult2.result()).toJson());
                            if (!subscriber.isUnsubscribed()) {
                                subscriber.onNext(asyncResult2.result());
                                subscriber.onCompleted();
                            }
                        }
                        if (asyncResult2.failed()) {
                            log.error("Error when trying to publish the service: " + asyncResult2.cause(), asyncResult2.cause());
                            if (subscriber.isUnsubscribed()) {
                                return;
                            }
                            subscriber.onError(asyncResult2.cause());
                        }
                    });
                }
            });
        });
    }

    public Observable<Record> removeRecordsWithStatus(Status status) {
        return Observable.create(subscriber -> {
            this.serviceDiscovery.getRecords(record -> {
                return Boolean.valueOf(status.equals(record.getStatus()));
            }, true, asyncResult -> {
                if (asyncResult.succeeded()) {
                    if (((List) asyncResult.result()).isEmpty() && !subscriber.isUnsubscribed()) {
                        subscriber.onCompleted();
                        return;
                    }
                    Observable.from((Iterable) asyncResult.result()).flatMap(record2 -> {
                        return Observable.create(subscriber -> {
                            this.serviceDiscovery.unpublish(record2.getRegistration(), asyncResult -> {
                                if (asyncResult.failed() && !subscriber.isUnsubscribed()) {
                                    subscriber.onError(asyncResult.cause());
                                } else {
                                    if (!asyncResult.succeeded() || subscriber.isUnsubscribed()) {
                                        return;
                                    }
                                    subscriber.onNext(record2);
                                    subscriber.onCompleted();
                                }
                            });
                        });
                    }).subscribe(subscriber);
                }
                if (asyncResult.failed()) {
                    log.info("No matching records: " + asyncResult.cause());
                    subscriber.onError(asyncResult.cause());
                }
            });
        });
    }

    public Observable<Record> startDiscovery(Record record) {
        log.info("Starting service discovery...");
        return isClosed() ? publishRecord(record).subscribeOn(Factories.SINGLE_THREAD).doOnCompleted(() -> {
            this.isClosed.set(false);
        }) : Observable.error(new IllegalStateException("Service discovery is already started"));
    }

    public Observable<Record> closeDiscovery(Record record) {
        log.info("Closing service discovery...");
        if (!isOpen()) {
            return Observable.error(new IllegalStateException("Service discovery is already closed"));
        }
        Observable doOnCompleted = Observable.just(record).subscribeOn(Factories.SINGLE_THREAD).observeOn(Factories.SINGLE_THREAD).flatMap(record2 -> {
            return removeIf(record2, ServiceRecords::AreEquals);
        }).doOnCompleted(() -> {
            this.serviceDiscovery.release(this.serviceDiscovery.getReference(record));
        });
        ServiceDiscovery serviceDiscovery = this.serviceDiscovery;
        serviceDiscovery.getClass();
        return doOnCompleted.doOnCompleted(serviceDiscovery::close).doOnCompleted(() -> {
            this.isClosed.set(true);
        });
    }

    private Observable<Record> removeIf(Record record, BiFunction<Record, Record, Boolean> biFunction) {
        return Observable.create(subscriber -> {
            this.serviceDiscovery.getRecords(record2 -> {
                return (Boolean) biFunction.apply(record2, record);
            }, true, asyncResult -> {
                if (asyncResult.succeeded()) {
                    if (((List) asyncResult.result()).isEmpty() && !subscriber.isUnsubscribed()) {
                        subscriber.onNext(record);
                        subscriber.onCompleted();
                        return;
                    }
                    Observable.from((Iterable) asyncResult.result()).doOnNext(record3 -> {
                        this.serviceDiscovery.release(this.serviceDiscovery.getReference(record3));
                    }).flatMap(record4 -> {
                        return Observable.create(subscriber -> {
                            this.serviceDiscovery.unpublish(record4.getRegistration(), asyncResult -> {
                                if (asyncResult.failed() && !subscriber.isUnsubscribed()) {
                                    subscriber.onError(asyncResult.cause());
                                }
                                if (!asyncResult.succeeded() || subscriber.isUnsubscribed()) {
                                    return;
                                }
                                subscriber.onNext(record4);
                                subscriber.onCompleted();
                            });
                        });
                    }).subscribe(record5 -> {
                        log.info("Record was unpublished: " + record5);
                    }, th -> {
                        log.error("Error while trying to unpublish the record: " + th);
                        subscriber.onNext(record);
                        subscriber.onCompleted();
                    }, () -> {
                        subscriber.onNext(record);
                        subscriber.onCompleted();
                    });
                }
                if (asyncResult.failed()) {
                    log.info("No matching records: " + asyncResult.cause());
                    subscriber.onError(asyncResult.cause());
                }
            });
        });
    }

    public boolean isClosed() {
        return this.isClosed.get();
    }

    public boolean isOpen() {
        return !this.isClosed.get();
    }
}
