package org.apache.skywalking.oap.server.core.register.worker;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.class */
public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
    private static final Logger logger = LoggerFactory.getLogger(RegisterPersistentWorker.class);
    private final int scopeId;
    private final String modelName;
    private final Map<RegisterSource, RegisterSource> sources;
    private final IRegisterLockDAO registerLockDAO;
    private final IRegisterDAO registerDAO;
    private final DataCarrier<RegisterSource> dataCarrier;
    private final HistogramMetrics workerLatencyHistogram;

    /* loaded from: input_file:org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker$PersistentConsumer.class */
    private class PersistentConsumer implements IConsumer<RegisterSource> {
        private final RegisterPersistentWorker persistent;

        private PersistentConsumer(RegisterPersistentWorker registerPersistentWorker) {
            this.persistent = registerPersistentWorker;
        }

        public void init() {
        }

        public void consume(List<RegisterSource> list) {
            int i = 0;
            for (RegisterSource registerSource : list) {
                i++;
                if (i == list.size()) {
                    registerSource.asEndOfBatch();
                }
                this.persistent.onWork(registerSource);
            }
        }

        public void onError(List<RegisterSource> list, Throwable th) {
            RegisterPersistentWorker.logger.error(th.getMessage(), th);
        }

        public void onExit() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RegisterPersistentWorker(ModuleDefineHolder moduleDefineHolder, String str, IRegisterDAO iRegisterDAO, int i) {
        super(moduleDefineHolder);
        this.modelName = str;
        this.sources = new HashMap();
        this.registerDAO = iRegisterDAO;
        this.registerLockDAO = (IRegisterLockDAO) moduleDefineHolder.find(StorageModule.NAME).provider().getService(IRegisterLockDAO.class);
        this.scopeId = i;
        this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + str, 1, 1000);
        this.workerLatencyHistogram = moduleDefineHolder.find("telemetry").provider().getService(MetricsCreator.class).createHistogramMetric("register_persistent_worker_latency", "The process latency of register persistent worker", new MetricsTag.Keys(new String[]{"module"}), new MetricsTag.Values(new String[]{str}), new double[0]);
        int recommendMaxSize = BulkConsumePool.Creator.recommendMaxSize() / 8;
        try {
            ConsumerPoolFactory.INSTANCE.createIfAbsent("REGISTER_L2", new BulkConsumePool.Creator("REGISTER_L2", recommendMaxSize == 0 ? 1 : recommendMaxSize, 200L));
            this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get("REGISTER_L2"), new PersistentConsumer(this));
        } catch (Exception e) {
            throw new UnexpectedException(e.getMessage(), e);
        }
    }

    @Override // org.apache.skywalking.oap.server.core.worker.AbstractWorker
    public final void in(RegisterSource registerSource) {
        registerSource.resetEndOfBatch();
        this.dataCarrier.produce(registerSource);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onWork(RegisterSource registerSource) {
        if (this.sources.containsKey(registerSource)) {
            this.sources.get(registerSource).combine(registerSource);
        } else {
            this.sources.put(registerSource, registerSource);
        }
        HistogramMetrics.Timer createTimer = this.workerLatencyHistogram.createTimer();
        Throwable th = null;
        try {
            try {
                if (this.sources.size() > 1000 || registerSource.isEndOfBatch()) {
                    this.sources.values().forEach(registerSource2 -> {
                        try {
                            RegisterSource registerSource2 = this.registerDAO.get(this.modelName, registerSource2.id());
                            if (!Objects.nonNull(registerSource2)) {
                                int id = this.registerLockDAO.getId(this.scopeId, registerSource2);
                                if (id != 0) {
                                    try {
                                        RegisterSource registerSource3 = this.registerDAO.get(this.modelName, registerSource2.id());
                                        if (!Objects.nonNull(registerSource3)) {
                                            registerSource2.setSequence(id);
                                            this.registerDAO.forceInsert(this.modelName, registerSource2);
                                        } else if (registerSource3.combine(registerSource2)) {
                                            this.registerDAO.forceUpdate(this.modelName, registerSource3);
                                        }
                                    } catch (Throwable th2) {
                                        logger.error(th2.getMessage(), th2);
                                    }
                                } else {
                                    logger.info("{} inventory register try lock and increment sequence failure.", DefaultScopeDefine.nameOf(this.scopeId));
                                }
                            } else if (registerSource2.combine(registerSource2)) {
                                this.registerDAO.forceUpdate(this.modelName, registerSource2);
                            }
                        } catch (Throwable th3) {
                            logger.error(th3.getMessage(), th3);
                        }
                    });
                    this.sources.clear();
                }
                if (createTimer != null) {
                    if (0 == 0) {
                        createTimer.close();
                        return;
                    }
                    try {
                        createTimer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTimer != null) {
                if (th != null) {
                    try {
                        createTimer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTimer.close();
                }
            }
            throw th4;
        }
    }
}
