package io.openk9.ingestion.driver.manager.internal;

import io.openk9.datasource.model.Datasource;
import io.openk9.datasource.repository.DatasourceRepository;
import io.openk9.ingestion.driver.manager.api.PluginDriver;
import io.openk9.ingestion.driver.manager.api.PluginDriverRegistry;
import io.openk9.osgi.util.AutoCloseables;
import io.openk9.sql.api.event.EntityEvent;
import io.openk9.sql.api.event.EntityEventBus;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.karaf.scheduler.Job;
import org.apache.karaf.scheduler.ScheduleOptions;
import org.apache.karaf.scheduler.Scheduler;
import org.apache.karaf.scheduler.SchedulerError;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;

@Component(immediate = true, service = {DriverManagerActivator.class})
/* loaded from: input_file:io/openk9/ingestion/driver/manager/internal/DriverManagerActivator.class */
public class DriverManagerActivator {
    private final CopyOnWriteArrayList<Disposable> _disposables = new CopyOnWriteArrayList<>();
    private AutoCloseables.AutoCloseableSafe _autoClosableSafe;

    @Reference
    private Scheduler _scheduler;

    @Reference
    private DatasourceRepository _datasourceRepository;

    @Reference
    private PluginDriverRegistry _pluginDriverRegistry;

    @Reference
    private EntityEventBus _entityEventBus;
    private static final String _PREFIX = DriverManagerActivator.class.getName() + "-";
    private static final String _ROOT_SCHEDULER = _PREFIX + "ROOT";
    private static final Logger _log = LoggerFactory.getLogger(DriverManagerActivator.class);

    /* loaded from: input_file:io/openk9/ingestion/driver/manager/internal/DriverManagerActivator$Config.class */
    @interface Config {
        String cronExpression() default "0 */2 * ? * *";
    }

    @Activate
    public void activate(Config config) throws SchedulerError {
        Disposable subscribe = this._datasourceRepository.findAll(true).concatMap(this::_schedule).subscribe();
        Disposable subscribe2 = this._entityEventBus.stream().filter(entityEvent -> {
            return entityEvent.getEntityClass() == Datasource.class;
        }).concatMap(entityEvent2 -> {
            Datasource datasource = (Datasource) entityEvent2.getValue();
            return ((entityEvent2 instanceof EntityEvent.UpdateEvent) || (entityEvent2 instanceof EntityEvent.InsertEvent)) ? _schedule(datasource) : Mono.fromRunnable(() -> {
                this._scheduler.unschedule(_PREFIX + datasource.getName());
            });
        }).subscribe();
        Objects.requireNonNull(subscribe);
        Objects.requireNonNull(subscribe2);
        this._autoClosableSafe = AutoCloseables.mergeAutoCloseableToSafe(new AutoCloseable[]{subscribe::dispose, subscribe2::dispose});
    }

    @Deactivate
    public void deactivate() throws SchedulerError {
        Iterator<Disposable> it = this._disposables.iterator();
        while (it.hasNext()) {
            Disposable next = it.next();
            if (!next.isDisposed()) {
                next.dispose();
            }
            this._disposables.remove(next);
        }
        _unschedule();
        this._scheduler.unschedule(_ROOT_SCHEDULER);
        this._autoClosableSafe.close();
    }

    private Mono<Void> _schedule(Datasource datasource) {
        return Mono.create(monoSink -> {
            try {
                Map jobs = this._scheduler.getJobs();
                String str = _PREFIX + datasource.getName();
                if (((ScheduleOptions) jobs.get(str)) != null) {
                    this._scheduler.unschedule(str);
                }
                this._scheduler.schedule(_createJob(datasource.getDriverServiceName(), datasource.getDatasourceId()), this._scheduler.EXPR(datasource.getScheduling()).name(str));
                this._scheduler.trigger(str);
                monoSink.success();
            } catch (SchedulerError e) {
                monoSink.error(e);
            }
        });
    }

    protected void _unschedule() throws SchedulerError {
        for (String str : this._scheduler.getJobs().keySet()) {
            if (!str.equals(_ROOT_SCHEDULER) && str.startsWith(_PREFIX)) {
                this._scheduler.unschedule(str);
            }
        }
    }

    private Job _createJob(String str, Long l) {
        return jobContext -> {
            Optional filter = this._pluginDriverRegistry.getPluginDriver(str).filter((v0) -> {
                return v0.schedulerEnabled();
            });
            if (filter.isPresent()) {
                this._disposables.add(this._datasourceRepository.findByPrimaryKey(l).flatMap(datasource -> {
                    return Mono.from(((PluginDriver) filter.get()).invokeDataParser(datasource, Date.from(datasource.getLastIngestionDate()), new Date()));
                }).subscribe());
            } else if (_log.isWarnEnabled()) {
                _log.warn("[SCHEDULER] datasourceId: " + l + " service: " + str + " not found");
            }
        };
    }
}
