package org.apache.hugegraph.computer.k8s.operator.common;

import com.google.common.collect.ImmutableAsList;
import com.google.common.collect.Sets;
import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.api.model.EventSource;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.LabelSelector;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil;
import io.fabric8.kubernetes.client.utils.Serialization;
import java.lang.reflect.ParameterizedType;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hugegraph.computer.k8s.crd.model.EventType;
import org.apache.hugegraph.computer.k8s.operator.config.OperatorOptions;
import org.apache.hugegraph.computer.k8s.util.KubeUtil;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.util.ExecutorUtil;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/computer/k8s/operator/common/AbstractController.class */
public abstract class AbstractController<T extends CustomResource<?, ?>> {
    private static final Logger LOG = Log.logger((Class<?>) AbstractController.class);
    protected final HugeConfig config;
    protected final NamespacedKubernetesClient kubeClient;
    private final ScheduledExecutorService executor;
    private Set<Class<? extends HasMetadata>> ownsClassSet;
    private Map<Class<? extends HasMetadata>, SharedIndexInformer<? extends HasMetadata>> informerMap;
    private final Class<T> crClass = getCRClass();
    private final String kind = HasMetadata.getKind(this.crClass).toLowerCase(Locale.ROOT);
    private final WorkQueue<OperatorRequest> workQueue = new WorkQueue<>();

    public AbstractController(HugeConfig hugeConfig, NamespacedKubernetesClient namespacedKubernetesClient) {
        this.config = hugeConfig;
        this.kubeClient = namespacedKubernetesClient;
        this.executor = ExecutorUtil.newScheduledThreadPool(((Integer) this.config.get(OperatorOptions.RECONCILER_COUNT)).intValue(), this.kind + "-reconciler-%d");
    }

    @SafeVarargs
    public final void register(SharedInformerFactory sharedInformerFactory, Class<? extends HasMetadata>... clsArr) {
        Long l = (Long) this.config.get(OperatorOptions.RESYNC_PERIOD);
        this.ownsClassSet = Sets.newHashSet(clsArr);
        this.informerMap = new ConcurrentHashMap();
        registerCREvent(sharedInformerFactory, l.longValue());
        registerOwnsEvent(sharedInformerFactory, l.longValue());
    }

    public void run(CountDownLatch countDownLatch) {
        LOG.info("Starting the {}-controller...", this.kind);
        if (!hasSynced()) {
            LOG.error("Failed to start {}-controller: Timed out waiting for informer to be synced", this.kind);
            return;
        }
        Integer num = (Integer) this.config.get(OperatorOptions.RECONCILER_COUNT);
        CountDownLatch countDownLatch2 = new CountDownLatch(num.intValue());
        for (int i = 0; i < num.intValue(); i++) {
            this.executor.scheduleWithFixedDelay(() -> {
                try {
                    startWorker();
                } catch (Throwable th) {
                    LOG.error("Unexpected reconcile loop abortion", th);
                } finally {
                    countDownLatch2.countDown();
                }
            }, Duration.ZERO.toMillis(), Duration.ofSeconds(1L).toMillis(), TimeUnit.MILLISECONDS);
        }
        countDownLatch.countDown();
        try {
            try {
                countDownLatch2.await();
                LOG.info("The {}-controller exited", this.kind);
            } catch (InterruptedException e) {
                LOG.error("Aborting {}-controller.", this.kind, e);
                LOG.info("The {}-controller exited", this.kind);
            }
        } catch (Throwable th) {
            LOG.info("The {}-controller exited", this.kind);
            throw th;
        }
    }

    public void shutdown() {
        this.workQueue.shutdown();
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(((Long) this.config.get(OperatorOptions.CLOSE_RECONCILER_TIMEOUT)).longValue(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    protected abstract OperatorResult reconcile(OperatorRequest operatorRequest);

    protected abstract void handleFailOverLimit(OperatorRequest operatorRequest, Exception exc);

    private void startWorker() {
        OperatorResult operatorResult;
        while (!this.executor.isShutdown() && !this.workQueue.isShutdown() && !Thread.currentThread().isInterrupted()) {
            LOG.debug("Trying to get item from work queue of {}-controller", this.kind);
            OperatorRequest operatorRequest = null;
            try {
                operatorRequest = this.workQueue.get();
            } catch (InterruptedException e) {
                LOG.warn("The {}-controller worker interrupted...", this.kind, e);
            }
            if (operatorRequest == null) {
                LOG.info("The {}-controller worker exiting...", this.kind);
                return;
            }
            try {
                LOG.debug("Start reconcile request: {}", operatorRequest);
                operatorResult = reconcile(operatorRequest);
            } catch (Exception e2) {
                LOG.error("Reconcile occur error, requeue request: ", (Throwable) e2);
                if (operatorRequest.retryIncrGet() > ((Integer) this.config.get(OperatorOptions.MAX_RECONCILE_RETRY)).intValue()) {
                    try {
                        handleFailOverLimit(operatorRequest, e2);
                    } catch (Exception e3) {
                        LOG.error("Reconcile fail over limit occur error:", (Throwable) e3);
                    }
                    operatorResult = OperatorResult.NO_REQUEUE;
                } else {
                    operatorResult = OperatorResult.REQUEUE;
                }
            }
            try {
                if (operatorResult.requeue()) {
                    enqueueRequest(operatorRequest);
                }
                LOG.debug("End reconcile request: {}", operatorRequest);
                this.workQueue.done(operatorRequest);
            } catch (Throwable th) {
                LOG.debug("End reconcile request: {}", operatorRequest);
                this.workQueue.done(operatorRequest);
                throw th;
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected void registerCREvent(SharedInformerFactory sharedInformerFactory, long j) {
        SharedIndexInformer sharedIndexInformerForCustomResource = sharedInformerFactory.sharedIndexInformerForCustomResource(this.crClass, j);
        sharedIndexInformerForCustomResource.addEventHandler(new ResourceEventHandler<T>() { // from class: org.apache.hugegraph.computer.k8s.operator.common.AbstractController.1
            @Override // io.fabric8.kubernetes.client.informers.ResourceEventHandler
            public void onAdd(T t) {
                AbstractController.LOG.info("Received a CR add request: {}", t.getMetadata());
                AbstractController.this.enqueueRequest(OperatorRequest.parseRequestByCR(t));
            }

            @Override // io.fabric8.kubernetes.client.informers.ResourceEventHandler
            public void onUpdate(T t, T t2) {
                AbstractController.LOG.info("Received a CR update request: {}", t2.getMetadata());
                AbstractController.this.enqueueRequest(OperatorRequest.parseRequestByCR(t2));
            }

            @Override // io.fabric8.kubernetes.client.informers.ResourceEventHandler
            public void onDelete(T t, boolean z) {
                AbstractController.LOG.info("Received a CR delete request: {}", t.getMetadata());
                AbstractController.this.enqueueRequest(OperatorRequest.parseRequestByCR(t));
            }
        });
        this.informerMap.put(this.crClass, sharedIndexInformerForCustomResource);
    }

    protected void registerOwnsEvent(SharedInformerFactory sharedInformerFactory, long j) {
        this.ownsClassSet.forEach(cls -> {
            SharedIndexInformer<? extends HasMetadata> sharedIndexInformerFor = sharedInformerFactory.sharedIndexInformerFor(cls, j);
            sharedIndexInformerFor.addEventHandler(new ResourceEventHandler<HasMetadata>() { // from class: org.apache.hugegraph.computer.k8s.operator.common.AbstractController.2
                @Override // io.fabric8.kubernetes.client.informers.ResourceEventHandler
                public void onAdd(HasMetadata hasMetadata) {
                    AbstractController.this.handleOwnsResource(hasMetadata);
                }

                @Override // io.fabric8.kubernetes.client.informers.ResourceEventHandler
                public void onUpdate(HasMetadata hasMetadata, HasMetadata hasMetadata2) {
                    if (Objects.equals(hasMetadata.getMetadata().getResourceVersion(), hasMetadata2.getMetadata().getResourceVersion())) {
                        return;
                    }
                    AbstractController.this.handleOwnsResource(hasMetadata2);
                }

                @Override // io.fabric8.kubernetes.client.informers.ResourceEventHandler
                public void onDelete(HasMetadata hasMetadata, boolean z) {
                    AbstractController.this.handleOwnsResource(hasMetadata);
                }
            });
            this.informerMap.put(cls, sharedIndexInformerFor);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public T getCR(OperatorRequest operatorRequest) {
        return (T) getResourceByName(operatorRequest.namespace(), operatorRequest.name(), this.crClass);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <R extends HasMetadata> R getResourceByName(String str, String str2, Class<R> cls) {
        SharedIndexInformer<? extends HasMetadata> sharedIndexInformer = this.informerMap.get(cls);
        HasMetadata byKey = sharedIndexInformer.getIndexer().getByKey(new OperatorRequest(str, str2).key());
        if (byKey == null) {
            return null;
        }
        return (R) Serialization.clone(byKey);
    }

    protected <R extends HasMetadata> List<R> getResourceList(String str, Class<R> cls) {
        ImmutableAsList immutableAsList = (List<R>) this.informerMap.get(cls).getIndexer().byIndex(Cache.NAMESPACE_INDEX, str);
        if (CollectionUtils.isEmpty(immutableAsList)) {
            return immutableAsList;
        }
        for (int i = 0; i < immutableAsList.size(); i++) {
            immutableAsList.set(i, (HasMetadata) Serialization.clone((HasMetadata) immutableAsList.get(i)));
        }
        return immutableAsList;
    }

    protected <R extends HasMetadata> List<R> getResourceListWithLabels(String str, Class<R> cls, Map<String, String> map) {
        List<R> list = (List<R>) this.informerMap.get(cls).getIndexer().byIndex(Cache.NAMESPACE_INDEX, str);
        if (CollectionUtils.isEmpty(list)) {
            return list;
        }
        if (MapUtils.isEmpty(map)) {
            return getResourceList(str, cls);
        }
        ArrayList arrayList = new ArrayList();
        Set<Map.Entry<String, String>> entrySet = map.entrySet();
        for (R r : list) {
            Map<String, String> labels = KubernetesResourceUtil.getLabels(r.getMetadata());
            if (!MapUtils.isEmpty(labels) && labels.entrySet().containsAll(entrySet)) {
                arrayList.add((HasMetadata) Serialization.clone(r));
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<Pod> getPodsByJob(Job job) {
        String namespace = job.getMetadata().getNamespace();
        LabelSelector selector = job.getSpec().getSelector();
        if (selector != null) {
            Map<String, String> matchLabels = selector.getMatchLabels();
            if (MapUtils.isNotEmpty(matchLabels)) {
                return getResourceListWithLabels(namespace, Pod.class, matchLabels);
            }
        }
        return Collections.emptyList();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v24, types: [io.fabric8.kubernetes.client.KubernetesClient] */
    public void recordEvent(HasMetadata hasMetadata, EventType eventType, String str, String str2, String str3) {
        String str4 = HasMetadata.getKind(this.crClass) + "Operator";
        EventSource eventSource = new EventSource();
        eventSource.setComponent(str4);
        Event buildEvent = KubeUtil.buildEvent(hasMetadata, eventSource, eventType, str, str2, str3);
        NamespacedKubernetesClient namespacedKubernetesClient = this.kubeClient;
        String namespace = buildEvent.getMetadata().getNamespace();
        if (!Objects.equals(this.kubeClient.getNamespace(), namespace)) {
            namespacedKubernetesClient = (KubernetesClient) this.kubeClient.inNamespace(namespace);
        }
        namespacedKubernetesClient.v1().events().createOrReplace(buildEvent);
    }

    private void handleOwnsResource(HasMetadata hasMetadata) {
        MatchWithMsg ownsPredicate = ownsPredicate(hasMetadata);
        if (ownsPredicate.isMatch()) {
            enqueueRequest(new OperatorRequest(hasMetadata.getMetadata().getNamespace(), ownsPredicate.msg()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MatchWithMsg ownsPredicate(HasMetadata hasMetadata) {
        OwnerReference controllerOf = getControllerOf(hasMetadata);
        return (controllerOf != null && controllerOf.getController().booleanValue() && StringUtils.isNotBlank(controllerOf.getName()) && StringUtils.equalsIgnoreCase(controllerOf.getKind(), this.kind)) ? new MatchWithMsg(true, controllerOf.getName()) : MatchWithMsg.NO_MATCH;
    }

    protected OwnerReference getControllerOf(HasMetadata hasMetadata) {
        for (OwnerReference ownerReference : hasMetadata.getMetadata().getOwnerReferences()) {
            if (Boolean.TRUE.equals(ownerReference.getController())) {
                return ownerReference;
            }
        }
        return null;
    }

    private void enqueueRequest(OperatorRequest operatorRequest) {
        if (operatorRequest != null) {
            LOG.debug("Enqueue a resource request: {}", operatorRequest);
            this.workQueue.add(operatorRequest);
        }
    }

    private boolean hasSynced() {
        if (MapUtils.isEmpty(this.informerMap)) {
            return true;
        }
        return KubeUtil.waitUntilReady(Duration.ZERO, Duration.ofMillis(((Long) this.config.get(OperatorOptions.READY_CHECK_INTERNAL)).longValue()), Duration.ofMillis(((Long) this.config.get(OperatorOptions.READY_TIMEOUT)).longValue()), () -> {
            return Boolean.valueOf(this.informerMap.values().stream().allMatch((v0) -> {
                return v0.hasSynced();
            }));
        }, this.executor);
    }

    private Class<T> getCRClass() {
        return (Class) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
    }
}
