package com.orientechnologies.orient.etl;

import com.orientechnologies.common.exception.OException;
import com.orientechnologies.common.io.OIOUtils;
import com.orientechnologies.common.thread.NonDaemonThreadFactory;
import com.orientechnologies.orient.core.OConstants;
import com.orientechnologies.orient.core.Orient;
import com.orientechnologies.orient.core.command.OCommandContext;
import com.orientechnologies.orient.core.exception.OConfigurationException;
import com.orientechnologies.orient.core.record.OEdge;
import com.orientechnologies.orient.core.record.OVertex;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.etl.block.OETLBlock;
import com.orientechnologies.orient.etl.context.OETLContext;
import com.orientechnologies.orient.etl.context.OETLContextWrapper;
import com.orientechnologies.orient.etl.extractor.OETLExtractor;
import com.orientechnologies.orient.etl.loader.OETLLoader;
import com.orientechnologies.orient.etl.source.OETLSource;
import com.orientechnologies.orient.etl.transformer.OETLTransformer;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/* loaded from: input_file:com/orientechnologies/orient/etl/OETLProcessor.class */
public class OETLProcessor {
    protected List<OETLBlock> endBlocks;
    protected List<OETLTransformer> transformers;
    protected List<OETLBlock> beginBlocks;
    protected OETLSource source;
    protected OETLExtractor extractor;
    protected OETLLoader loader;
    protected OETLContext context;
    protected long startTime;
    protected long elapsed;
    protected TimerTask dumpTask;
    protected Level logLevel = Level.INFO;
    protected boolean haltOnError = true;
    protected int maxRetries = 10;
    protected int workers = 1;
    private boolean parallel = false;
    protected final OETLComponentFactory factory = new OETLComponentFactory();
    protected final OETLProcessorStats stats = new OETLProcessorStats();
    private final ExecutorService executor = Executors.newCachedThreadPool(new NonDaemonThreadFactory("ETL processor thread"));

    /* loaded from: input_file:com/orientechnologies/orient/etl/OETLProcessor$LOG_LEVELS.class */
    public enum LOG_LEVELS {
        NONE(Level.OFF),
        ERROR(Level.SEVERE),
        INFO(Level.INFO),
        DEBUG(Level.FINE);

        private final Level julLevel;

        LOG_LEVELS(Level level) {
            this.julLevel = level;
        }

        public Level toJulLevel() {
            return this.julLevel;
        }
    }

    /* loaded from: input_file:com/orientechnologies/orient/etl/OETLProcessor$OETLProcessorStats.class */
    public class OETLProcessorStats {
        public long lastExtractorProgress = 0;
        public long lastLoaderProgress = 0;
        public long lastLap = 0;
        public AtomicLong warnings = new AtomicLong();
        public AtomicLong errors = new AtomicLong();

        public OETLProcessorStats() {
        }

        public long incrementWarnings() {
            return this.warnings.incrementAndGet();
        }

        public long incrementErrors() {
            return this.errors.incrementAndGet();
        }
    }

    public OETLProcessor(List<OETLBlock> list, OETLSource oETLSource, OETLExtractor oETLExtractor, List<OETLTransformer> list2, OETLLoader oETLLoader, List<OETLBlock> list3, OETLContext oETLContext) {
        this.beginBlocks = list;
        this.source = oETLSource;
        this.extractor = oETLExtractor;
        this.transformers = list2;
        this.loader = oETLLoader;
        this.endBlocks = list3;
        this.context = oETLContext;
        configRunBehaviour(this.context);
        OETLContextWrapper.getInstance().setContext(this.context);
    }

    public static void main(String[] strArr) {
        System.out.println("OrientDB etl v." + OConstants.getVersion() + " https://www.orientdb.com");
        if (strArr.length == 0) {
            System.out.println("Syntax error, missing configuration file.");
            System.out.println("Use: oetl.sh <json-file>");
            System.exit(1);
        }
        new OETLProcessorConfigurator().parseConfigAndParameters(strArr).execute();
    }

    protected void configRunBehaviour(OCommandContext oCommandContext) {
        int availableProcessors;
        String str = (String) oCommandContext.getVariable("log");
        if (str != null) {
            this.logLevel = LOG_LEVELS.valueOf(str.toUpperCase(Locale.ENGLISH)).toJulLevel();
        }
        Boolean bool = (Boolean) oCommandContext.getVariable("haltOnError");
        if (bool != null) {
            this.haltOnError = bool.booleanValue();
        }
        Object variable = oCommandContext.getVariable("parallel");
        if (variable != null) {
            this.parallel = ((Boolean) variable).booleanValue();
        }
        if (!this.parallel || (availableProcessors = Runtime.getRuntime().availableProcessors()) < 2) {
            return;
        }
        this.workers = availableProcessors - 1;
    }

    public OETLProcessorStats getStats() {
        return this.stats;
    }

    public OETLExtractor getExtractor() {
        return this.extractor;
    }

    public OETLSource getSource() {
        return this.source;
    }

    public OETLLoader getLoader() {
        return this.loader;
    }

    public List<OETLTransformer> getTransformers() {
        return this.transformers;
    }

    public Level getLogLevel() {
        return this.logLevel;
    }

    public OETLContext getContext() {
        return this.context;
    }

    public void execute() {
        configure();
        begin();
        try {
            runExtractorAndPipeline();
        } finally {
            end();
        }
    }

    private void configure() {
    }

    public void close() {
        this.loader.getPool().close();
        this.loader.close();
    }

    private void runExtractorAndPipeline() {
        try {
            getContext().getMessageHandler().info(this, "Started execution with %d worker threads", new Object[]{Integer.valueOf(this.workers)});
            this.extractor.extract(this.source.read());
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(this.workers * 500);
            List list = (List) IntStream.range(0, this.workers).boxed().map(num -> {
                return CompletableFuture.runAsync(new OETLPipelineWorker(linkedBlockingQueue, new OETLPipeline(this, this.transformers, this.loader, this.logLevel, this.maxRetries, this.haltOnError)), this.executor);
            }).collect(Collectors.toList());
            list.add(CompletableFuture.runAsync(new OETLExtractorWorker(this.extractor, linkedBlockingQueue, this.haltOnError), this.executor));
            list.forEach(completableFuture -> {
            });
            getContext().getMessageHandler().debug(this, "all items extracted");
            this.executor.shutdown();
        } catch (OETLProcessHaltedException e) {
            getContext().getMessageHandler().error(this, "ETL process halted: ", new Object[]{e});
            this.executor.shutdownNow();
        } catch (Exception e2) {
            getContext().getMessageHandler().error(this, "ETL process has problem: ", new Object[]{e2});
            this.executor.shutdownNow();
        }
        this.executor.shutdown();
    }

    protected void begin() {
        getContext().getMessageHandler().info(this, "BEGIN ETL PROCESSOR");
        Integer num = (Integer) this.context.getVariable("maxRetries");
        if (num != null) {
            this.maxRetries = num.intValue();
        }
        Integer num2 = (Integer) this.context.getVariable("dumpEveryMs");
        if (num2 != null && num2.intValue() > 0) {
            this.dumpTask = Orient.instance().scheduleTask(this::dumpProgress, num2.intValue(), num2.intValue());
            this.startTime = System.currentTimeMillis();
        }
        for (OETLBlock oETLBlock : this.beginBlocks) {
            oETLBlock.begin(null);
            oETLBlock.execute();
            oETLBlock.end();
        }
        if (this.source != null) {
            this.source.begin(null);
        }
        this.extractor.begin(null);
    }

    protected void end() {
        Iterator<OETLTransformer> it = this.transformers.iterator();
        while (it.hasNext()) {
            it.next().end();
        }
        if (this.source != null) {
            this.source.end();
        }
        this.extractor.end();
        this.loader.end();
        for (OETLBlock oETLBlock : this.endBlocks) {
            oETLBlock.begin(null);
            oETLBlock.execute();
            oETLBlock.end();
        }
        this.elapsed = System.currentTimeMillis() - this.startTime;
        if (this.dumpTask != null) {
            this.dumpTask.cancel();
        }
        getContext().getMessageHandler().info(this, "END ETL PROCESSOR");
        dumpProgress();
    }

    protected void dumpProgress() {
        long currentTimeMillis = System.currentTimeMillis();
        long progress = this.extractor.getProgress();
        long total = this.extractor.getTotal();
        long j = (((float) (progress - this.stats.lastExtractorProgress)) * 1000.0f) / ((float) (currentTimeMillis - this.stats.lastLap));
        String unit = this.extractor.getUnit();
        long progress2 = this.loader.getProgress();
        long j2 = (((float) (progress2 - this.stats.lastLoaderProgress)) * 1000.0f) / ((float) (currentTimeMillis - this.stats.lastLap));
        String unit2 = this.loader.getUnit();
        String format = total > -1 ? String.format("%,d", Long.valueOf(total)) : "?";
        if (total == -1) {
            getContext().getMessageHandler().info(this, "+ extracted %,d %s (%,d %s/sec) - %,d %s -> loaded %,d %s (%,d %s/sec) Total time: %s [%d warnings, %d errors]", new Object[]{Long.valueOf(progress), unit, Long.valueOf(j), unit, Long.valueOf(this.extractor.getProgress()), this.extractor.getUnit(), Long.valueOf(progress2), unit2, Long.valueOf(j2), unit2, OIOUtils.getTimeAsString(currentTimeMillis - this.startTime), Long.valueOf(this.stats.warnings.get()), Long.valueOf(this.stats.errors.get())});
        } else {
            getContext().getMessageHandler().info(this, "+ %3.2f%% -> extracted %,d/%,d %s (%,d %s/sec) - %,d %s -> loaded %,d %s (%,d %s/sec) Total time: %s [%d warnings, %d errors]", new Object[]{Float.valueOf((((float) progress) * 100.0f) / ((float) total)), Long.valueOf(progress), Long.valueOf(total), unit, Long.valueOf(j), unit, Long.valueOf(this.extractor.getProgress()), this.extractor.getUnit(), Long.valueOf(progress2), unit2, Long.valueOf(j2), unit2, OIOUtils.getTimeAsString(currentTimeMillis - this.startTime), Long.valueOf(this.stats.warnings.get()), Long.valueOf(this.stats.errors.get())});
        }
        this.stats.lastExtractorProgress = progress;
        this.stats.lastLoaderProgress = progress2;
        this.stats.lastLap = currentTimeMillis;
    }

    protected void analyzeFlow() {
        if (this.extractor == null) {
            throw new OConfigurationException("extractor is null");
        }
        if (this.loader == null) {
            throw new OConfigurationException("loader is null");
        }
        OETLComponent oETLComponent = this.extractor;
        for (OETLTransformer oETLTransformer : this.transformers) {
            checkTypeCompatibility(oETLTransformer, oETLComponent);
            oETLComponent = oETLTransformer;
        }
        checkTypeCompatibility(this.loader, oETLComponent);
    }

    protected void checkTypeCompatibility(OETLComponent oETLComponent, OETLComponent oETLComponent2) {
        List list;
        try {
            String str = (String) oETLComponent2.getConfiguration().field("output");
            if (str == null || (list = (List) oETLComponent.getConfiguration().field("input")) == null) {
                return;
            }
            Class classByName = getClassByName(oETLComponent2, str);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                if (getClassByName(oETLComponent, (String) it.next()).isAssignableFrom(classByName)) {
                    return;
                }
            }
            throw new OConfigurationException("Component '" + oETLComponent.getName() + "' expects one of the following inputs " + list + " but the 'output' for component '" + oETLComponent2.getName() + "' is: " + str);
        } catch (Exception e) {
            throw OException.wrapException(new OConfigurationException("Error on checking compatibility between components '" + oETLComponent2.getName() + "' and '" + oETLComponent.getName() + "'"), e);
        }
    }

    protected Class getClassByName(OETLComponent oETLComponent, String str) {
        Class<?> cls;
        if (str.equals("ODocument")) {
            cls = ODocument.class;
        } else if (str.equals("String")) {
            cls = String.class;
        } else if (str.equals("Object")) {
            cls = Object.class;
        } else if (str.equals("OrientVertex")) {
            cls = OVertex.class;
        } else if (str.equals("OrientEdge")) {
            cls = OEdge.class;
        } else {
            try {
                cls = Class.forName(str);
            } catch (ClassNotFoundException e) {
                throw OException.wrapException(new OConfigurationException("Class '" + str + "' declared as 'input' of ETL Component '" + oETLComponent.getName() + "' was not found."), e);
            }
        }
        return cls;
    }

    public OETLComponentFactory getFactory() {
        return this.factory;
    }
}
