package org.apache.skywalking.oap.server.core.register.worker;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.class */
public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
    private static final Logger logger = LoggerFactory.getLogger(RegisterPersistentWorker.class);
    private final Scope scope;
    private final String modelName;
    private final Map<RegisterSource, RegisterSource> sources;
    private final IRegisterLockDAO registerLockDAO;
    private final IRegisterDAO registerDAO;
    private final DataCarrier<RegisterSource> dataCarrier;

    /* loaded from: input_file:org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker$PersistentConsumer.class */
    private class PersistentConsumer implements IConsumer<RegisterSource> {
        private final RegisterPersistentWorker persistent;

        private PersistentConsumer(RegisterPersistentWorker registerPersistentWorker) {
            this.persistent = registerPersistentWorker;
        }

        public void init() {
        }

        public void consume(List<RegisterSource> list) {
            int i = 0;
            for (RegisterSource registerSource : list) {
                i++;
                if (i == list.size()) {
                    registerSource.getEndOfBatchContext().setEndOfBatch(true);
                }
                this.persistent.onWork(registerSource);
            }
        }

        public void onError(List<RegisterSource> list, Throwable th) {
            RegisterPersistentWorker.logger.error(th.getMessage(), th);
        }

        public void onExit() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RegisterPersistentWorker(int i, String str, ModuleManager moduleManager, IRegisterDAO iRegisterDAO, Scope scope) {
        super(i);
        this.modelName = str;
        this.sources = new HashMap();
        this.registerDAO = iRegisterDAO;
        this.registerLockDAO = (IRegisterLockDAO) moduleManager.find(StorageModule.NAME).provider().getService(IRegisterLockDAO.class);
        this.scope = scope;
        this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + str, 1, 10000);
        this.dataCarrier.consume(new PersistentConsumer(this), 1, 200L);
    }

    @Override // org.apache.skywalking.oap.server.core.worker.AbstractWorker
    public final void in(RegisterSource registerSource) {
        registerSource.setEndOfBatchContext(new EndOfBatchContext(false));
        this.dataCarrier.produce(registerSource);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onWork(RegisterSource registerSource) {
        if (this.sources.containsKey(registerSource)) {
            this.sources.get(registerSource).combine(registerSource);
        } else {
            this.sources.put(registerSource, registerSource);
        }
        if (this.sources.size() > 1000 || registerSource.getEndOfBatchContext().isEndOfBatch()) {
            this.sources.values().forEach(registerSource2 -> {
                try {
                    RegisterSource registerSource2 = this.registerDAO.get(this.modelName, registerSource2.id());
                    if (!Objects.nonNull(registerSource2)) {
                        int tryLockAndIncrement = this.registerLockDAO.tryLockAndIncrement(this.scope);
                        try {
                            if (tryLockAndIncrement != 0) {
                                try {
                                    RegisterSource registerSource3 = this.registerDAO.get(this.modelName, registerSource2.id());
                                    if (!Objects.nonNull(registerSource3)) {
                                        registerSource2.setSequence(tryLockAndIncrement);
                                        this.registerDAO.forceInsert(this.modelName, registerSource2);
                                    } else if (registerSource3.combine(registerSource2)) {
                                        this.registerDAO.forceUpdate(this.modelName, registerSource3);
                                    }
                                    this.registerLockDAO.releaseLock(this.scope);
                                } catch (Throwable th) {
                                    logger.error(th.getMessage(), th);
                                    this.registerLockDAO.releaseLock(this.scope);
                                }
                            } else {
                                logger.info("{} inventory register try lock and increment sequence failure.", this.scope.name());
                            }
                        } catch (Throwable th2) {
                            this.registerLockDAO.releaseLock(this.scope);
                            throw th2;
                        }
                    } else if (registerSource2.combine(registerSource2)) {
                        this.registerDAO.forceUpdate(this.modelName, registerSource2);
                    }
                } catch (Throwable th3) {
                    logger.error(th3.getMessage(), th3);
                }
            });
            this.sources.clear();
        }
    }
}
