package com.orientechnologies.orient.etl;

import com.orientechnologies.common.io.OIOUtils;
import com.orientechnologies.orient.core.OConstants;
import com.orientechnologies.orient.core.Orient;
import com.orientechnologies.orient.core.command.OBasicCommandContext;
import com.orientechnologies.orient.core.exception.OConfigurationException;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.core.serialization.serializer.OStringSerializerHelper;
import com.orientechnologies.orient.etl.block.OBlock;
import com.orientechnologies.orient.etl.extractor.OExtractor;
import com.orientechnologies.orient.etl.loader.OLoader;
import com.orientechnologies.orient.etl.source.OSource;
import com.orientechnologies.orient.etl.transformer.OTransformer;
import com.tinkerpop.blueprints.impls.orient.OrientEdge;
import com.tinkerpop.blueprints.impls.orient.OrientVertex;
import java.io.File;
import java.io.IOException;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:com/orientechnologies/orient/etl/OETLProcessor.class */
public class OETLProcessor {
    protected List<OBlock> beginBlocks;
    protected List<OBlock> endBlocks;
    protected OSource source;
    protected OExtractor extractor;
    protected OLoader loader;
    protected List<OTransformer> transformers;
    protected OBasicCommandContext context;
    protected long startTime;
    protected long elapsed;
    protected TimerTask dumpTask;
    private Thread[] threads;
    protected final OETLComponentFactory factory = new OETLComponentFactory();
    protected OETLProcessorStats stats = new OETLProcessorStats();
    protected LOG_LEVELS logLevel = LOG_LEVELS.INFO;
    protected boolean parallel = false;
    protected int maxRetries = 10;

    /* loaded from: input_file:com/orientechnologies/orient/etl/OETLProcessor$LOG_LEVELS.class */
    public enum LOG_LEVELS {
        NONE,
        ERROR,
        INFO,
        DEBUG
    }

    /* loaded from: input_file:com/orientechnologies/orient/etl/OETLProcessor$OETLProcessorStats.class */
    public class OETLProcessorStats {
        public long lastExtractorProgress = 0;
        public long lastLoaderProgress = 0;
        public long lastLap = 0;
        public AtomicLong warnings = new AtomicLong();
        public AtomicLong errors = new AtomicLong();

        public OETLProcessorStats() {
        }

        public long incrementWarnings() {
            return this.warnings.incrementAndGet();
        }

        public long incrementErrors() {
            return this.errors.incrementAndGet();
        }
    }

    public OETLProcessor(List<OBlock> list, OSource oSource, OExtractor oExtractor, List<OTransformer> list2, OLoader oLoader, List<OBlock> list3, OBasicCommandContext oBasicCommandContext) {
        this.beginBlocks = list;
        this.source = oSource;
        this.extractor = oExtractor;
        this.transformers = list2;
        this.loader = oLoader;
        this.endBlocks = list3;
        this.context = oBasicCommandContext;
        init();
    }

    public OETLProcessor() {
    }

    public static void main(String[] strArr) {
        ODocument oDocument = null;
        System.out.println("OrientDB etl v." + OConstants.getVersion() + " www.orientechnologies.com");
        if (strArr.length == 0) {
            System.out.println("Syntax error, missing configuration file.");
            System.out.println("Use: oetl.sh <json-file>");
            System.exit(1);
        }
        OBasicCommandContext createDefaultContext = createDefaultContext();
        ODocument oDocument2 = null;
        for (String str : strArr) {
            if (str.charAt(0) == '-') {
                String[] split = str.substring(1).split("=");
                createDefaultContext.setVariable(split[0].toUpperCase(), split[1]);
            } else {
                try {
                    oDocument2 = new ODocument().fromJSON(OIOUtils.readFileAsString(new File(str)), "noMap");
                    oDocument = (ODocument) oDocument2.field("config");
                } catch (IOException e) {
                    throw new OConfigurationException("Error on loading config file: " + str);
                }
            }
        }
        if (oDocument != null) {
            for (String str2 : oDocument.fieldNames()) {
                createDefaultContext.setVariable(str2, oDocument.field(str2));
            }
        }
        new OETLProcessor().parse(oDocument2, createDefaultContext).execute();
    }

    protected static OBasicCommandContext createDefaultContext() {
        OBasicCommandContext oBasicCommandContext = new OBasicCommandContext();
        oBasicCommandContext.setVariable("dumpEveryMs", 1000);
        return oBasicCommandContext;
    }

    protected static Collection<ODocument> parseTransformers(String str) {
        ArrayList arrayList = new ArrayList();
        if (!str.isEmpty()) {
            if (str.charAt(0) == '{') {
                arrayList.add(new ODocument().fromJSON(str, "noMap"));
            } else if (str.charAt(0) == '[') {
                ArrayList arrayList2 = new ArrayList();
                OStringSerializerHelper.getCollection(str, 0, arrayList2);
                Iterator it = arrayList2.iterator();
                while (it.hasNext()) {
                    arrayList.add(new ODocument().fromJSON((String) it.next(), "noMap"));
                }
            }
        }
        return arrayList;
    }

    public OETLProcessor parse(ODocument oDocument, OBasicCommandContext oBasicCommandContext) {
        return parse((Collection) oDocument.field("begin"), (ODocument) oDocument.field("source"), (ODocument) oDocument.field("extractor"), (Collection) oDocument.field("transformers"), (ODocument) oDocument.field("loader"), (Collection) oDocument.field("end"), oBasicCommandContext);
    }

    public OETLProcessor parse(Collection<ODocument> collection, ODocument oDocument, ODocument oDocument2, Collection<ODocument> collection2, ODocument oDocument3, Collection<ODocument> collection3, OBasicCommandContext oBasicCommandContext) {
        if (oDocument2 == null) {
            throw new IllegalArgumentException("No Extractor configured");
        }
        this.context = oBasicCommandContext != null ? oBasicCommandContext : createDefaultContext();
        init();
        try {
            this.beginBlocks = new ArrayList();
            if (collection != null) {
                for (ODocument oDocument4 : collection) {
                    String str = oDocument4.fieldNames()[0];
                    OBlock block = this.factory.getBlock(str);
                    this.beginBlocks.add(block);
                    configureComponent(block, (ODocument) oDocument4.field(str), oBasicCommandContext);
                    block.execute();
                }
            }
            if (oDocument != null) {
                String str2 = oDocument.fieldNames()[0];
                this.source = this.factory.getSource(str2);
                configureComponent(this.source, (ODocument) oDocument.field(str2), oBasicCommandContext);
            } else {
                this.source = this.factory.getSource("input");
            }
            String str3 = oDocument2.fieldNames()[0];
            this.extractor = this.factory.getExtractor(str3);
            configureComponent(this.extractor, (ODocument) oDocument2.field(str3), oBasicCommandContext);
            if (oDocument3 != null) {
                String str4 = oDocument3.fieldNames()[0];
                this.loader = this.factory.getLoader(str4);
                configureComponent(this.loader, (ODocument) oDocument3.field(str4), oBasicCommandContext);
            } else {
                this.loader = this.factory.getLoader("output");
            }
            this.transformers = new ArrayList();
            if (collection2 != null) {
                for (ODocument oDocument5 : collection2) {
                    String str5 = oDocument5.fieldNames()[0];
                    OTransformer transformer = this.factory.getTransformer(str5);
                    this.transformers.add(transformer);
                    configureComponent(transformer, (ODocument) oDocument5.field(str5), oBasicCommandContext);
                }
            }
            this.endBlocks = new ArrayList();
            if (collection3 != null) {
                for (ODocument oDocument6 : collection3) {
                    String str6 = oDocument6.fieldNames()[0];
                    OBlock block2 = this.factory.getBlock(str6);
                    this.endBlocks.add(block2);
                    configureComponent(block2, (ODocument) oDocument6.field(str6), oBasicCommandContext);
                }
            }
            return this;
        } catch (Exception e) {
            throw new OConfigurationException("Error on creating ETL processor", e);
        }
    }

    public OETLComponentFactory getFactory() {
        return this.factory;
    }

    public OETLProcessor execute() {
        if (this.parallel) {
            executeParallel();
        } else {
            executeSequentially();
        }
        return this;
    }

    public void out(LOG_LEVELS log_levels, String str, Object... objArr) {
        if (this.logLevel.ordinal() >= log_levels.ordinal()) {
            System.out.println(String.format(str, objArr));
        }
    }

    public OETLProcessorStats getStats() {
        return this.stats;
    }

    public OExtractor getExtractor() {
        return this.extractor;
    }

    public OLoader getLoader() {
        return this.loader;
    }

    public List<OTransformer> getTransformers() {
        return this.transformers;
    }

    public LOG_LEVELS getLogLevel() {
        return this.logLevel;
    }

    public OBasicCommandContext getContext() {
        return this.context;
    }

    protected void executeParallel() {
        Reader read;
        try {
            begin();
            out(LOG_LEVELS.INFO, "Started parallel execution with %d threads", Integer.valueOf(this.threads.length));
            if (this.source != null && (read = this.source.read()) != null) {
                this.extractor.extract(read);
            }
            final LinkedBlockingQueue<OExtractedItem> linkedBlockingQueue = new LinkedBlockingQueue<OExtractedItem>(this.threads.length * 500) { // from class: com.orientechnologies.orient.etl.OETLProcessor.1
                @Override // java.util.concurrent.LinkedBlockingQueue, java.util.Queue, java.util.concurrent.BlockingQueue
                public boolean offer(OExtractedItem oExtractedItem) {
                    try {
                        put(oExtractedItem);
                        return true;
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return false;
                    }
                }
            };
            final AtomicLong atomicLong = new AtomicLong();
            final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            for (int i = 0; i < this.threads.length; i++) {
                this.threads[i] = new Thread(new Runnable() { // from class: com.orientechnologies.orient.etl.OETLProcessor.2
                    @Override // java.lang.Runnable
                    public void run() {
                        OETLPipeline oETLPipeline = new OETLPipeline(this, OETLProcessor.this.transformers, OETLProcessor.this.loader, OETLProcessor.this.logLevel, OETLProcessor.this.maxRetries);
                        oETLPipeline.begin();
                        while (true) {
                            if (atomicBoolean.get() && atomicLong.get() <= 0) {
                                return;
                            }
                            try {
                                try {
                                    oETLPipeline.execute((OExtractedItem) linkedBlockingQueue.take());
                                    atomicLong.decrementAndGet();
                                } catch (Throwable th) {
                                    atomicLong.decrementAndGet();
                                    throw th;
                                    break;
                                }
                            } catch (InterruptedException e) {
                            }
                        }
                    }
                }, "OrientDB ETL pipeline-" + i);
                this.threads[i].setDaemon(true);
                this.threads[i].start();
            }
            while (this.extractor.hasNext()) {
                linkedBlockingQueue.offer(this.extractor.next());
                atomicLong.incrementAndGet();
            }
            atomicBoolean.set(true);
            while (atomicLong.get() > 0) {
                out(LOG_LEVELS.INFO, "Waiting for the pipeline to finish, remaining " + atomicLong.get() + " entries to process", new Object[0]);
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                }
            }
            end();
        } catch (OETLProcessHaltedException e2) {
            out(LOG_LEVELS.ERROR, "ETL process halted: %s", e2);
        }
    }

    protected void begin() {
        out(LOG_LEVELS.INFO, "BEGIN ETL PROCESSOR", new Object[0]);
        Integer num = (Integer) this.context.getVariable("maxRetries");
        if (num != null) {
            this.maxRetries = num.intValue();
        }
        int intValue = ((Integer) this.context.getVariable("dumpEveryMs")).intValue();
        if (intValue > 0) {
            this.dumpTask = new TimerTask() { // from class: com.orientechnologies.orient.etl.OETLProcessor.3
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    OETLProcessor.this.dumpProgress();
                }
            };
            Orient.instance().scheduleTask(this.dumpTask, intValue, intValue);
            this.startTime = System.currentTimeMillis();
        }
        for (OBlock oBlock : this.beginBlocks) {
            oBlock.begin();
            oBlock.execute();
            oBlock.end();
        }
        if (this.source != null) {
            this.source.begin();
        }
        this.extractor.begin();
    }

    protected void end() {
        Iterator<OTransformer> it = this.transformers.iterator();
        while (it.hasNext()) {
            it.next().end();
        }
        if (this.source != null) {
            this.source.end();
        }
        this.extractor.end();
        this.loader.end();
        for (OBlock oBlock : this.endBlocks) {
            oBlock.begin();
            oBlock.execute();
            oBlock.end();
        }
        this.elapsed = System.currentTimeMillis() - this.startTime;
        if (this.dumpTask != null) {
            this.dumpTask.cancel();
        }
        out(LOG_LEVELS.INFO, "END ETL PROCESSOR", new Object[0]);
        dumpProgress();
    }

    protected void executeSequentially() {
        Reader read;
        try {
            begin();
            if (this.source != null && (read = this.source.read()) != null) {
                this.extractor.extract(read);
            }
            OETLPipeline oETLPipeline = new OETLPipeline(this, this.transformers, this.loader, this.logLevel, this.maxRetries);
            oETLPipeline.begin();
            while (this.extractor.hasNext()) {
                oETLPipeline.execute(this.extractor.next());
            }
            end();
        } catch (OETLProcessHaltedException e) {
            out(LOG_LEVELS.ERROR, "ETL process halted: %s", e);
        }
    }

    protected void configureComponent(OETLComponent oETLComponent, ODocument oDocument, OBasicCommandContext oBasicCommandContext) {
        oETLComponent.configure(this, oDocument, oBasicCommandContext);
    }

    protected Class getClassByName(OETLComponent oETLComponent, String str) {
        Class<?> cls;
        if (str.equals("ODocument")) {
            cls = ODocument.class;
        } else if (str.equals("String")) {
            cls = String.class;
        } else if (str.equals("Object")) {
            cls = Object.class;
        } else if (str.equals("OrientVertex")) {
            cls = OrientVertex.class;
        } else if (str.equals("OrientEdge")) {
            cls = OrientEdge.class;
        } else {
            try {
                cls = Class.forName(str);
            } catch (ClassNotFoundException e) {
                throw new OConfigurationException("Class '" + str + "' declared as 'input' of ETL Component '" + oETLComponent.getName() + "' was not found.");
            }
        }
        return cls;
    }

    protected void dumpProgress() {
        long currentTimeMillis = System.currentTimeMillis();
        long progress = this.extractor.getProgress();
        long total = this.extractor.getTotal();
        long j = (((float) (progress - this.stats.lastExtractorProgress)) * 1000.0f) / ((float) (currentTimeMillis - this.stats.lastLap));
        String unit = this.extractor.getUnit();
        long progress2 = this.loader.getProgress();
        long j2 = (((float) (progress2 - this.stats.lastLoaderProgress)) * 1000.0f) / ((float) (currentTimeMillis - this.stats.lastLap));
        String unit2 = this.loader.getUnit();
        String format = total > -1 ? String.format("%,d", Long.valueOf(total)) : "?";
        if (total == -1) {
            out(LOG_LEVELS.INFO, "+ extracted %,d %s (%,d %s/sec) - %,d %s -> loaded %,d %s (%,d %s/sec) Total time: %s [%d warnings, %d errors]", Long.valueOf(progress), unit, Long.valueOf(j), unit, Long.valueOf(this.extractor.getProgress()), this.extractor.getUnit(), Long.valueOf(progress2), unit2, Long.valueOf(j2), unit2, OIOUtils.getTimeAsString(currentTimeMillis - this.startTime), Long.valueOf(this.stats.warnings.get()), Long.valueOf(this.stats.errors.get()));
        } else {
            out(LOG_LEVELS.INFO, "+ %3.2f%% -> extracted %,d/%,d %s (%,d %s/sec) - %,d %s -> loaded %,d %s (%,d %s/sec) Total time: %s [%d warnings, %d errors]", Float.valueOf((((float) progress) * 100.0f) / ((float) total)), Long.valueOf(progress), Long.valueOf(total), unit, Long.valueOf(j), unit, Long.valueOf(this.extractor.getProgress()), this.extractor.getUnit(), Long.valueOf(progress2), unit2, Long.valueOf(j2), unit2, OIOUtils.getTimeAsString(currentTimeMillis - this.startTime), Long.valueOf(this.stats.warnings.get()), Long.valueOf(this.stats.errors.get()));
        }
        this.stats.lastExtractorProgress = progress;
        this.stats.lastLoaderProgress = progress2;
        this.stats.lastLap = currentTimeMillis;
    }

    protected void analyzeFlow() {
        if (this.extractor == null) {
            throw new OConfigurationException("extractor is null");
        }
        if (this.loader == null) {
            throw new OConfigurationException("loader is null");
        }
        OETLComponent oETLComponent = this.extractor;
        for (OTransformer oTransformer : this.transformers) {
            checkTypeCompatibility(oTransformer, oETLComponent);
            oETLComponent = oTransformer;
        }
        checkTypeCompatibility(this.loader, oETLComponent);
    }

    protected void checkTypeCompatibility(OETLComponent oETLComponent, OETLComponent oETLComponent2) {
        List list;
        try {
            String str = (String) oETLComponent2.getConfiguration().field("output");
            if (str == null || (list = (List) oETLComponent.getConfiguration().field("input")) == null) {
                return;
            }
            Class classByName = getClassByName(oETLComponent2, str);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                if (getClassByName(oETLComponent, (String) it.next()).isAssignableFrom(classByName)) {
                    return;
                }
            }
            throw new OConfigurationException("Component '" + oETLComponent.getName() + "' expects one of the following inputs " + list + " but the 'output' for component '" + oETLComponent2.getName() + "' is: " + str);
        } catch (Exception e) {
            throw new OConfigurationException("Error on checking compatibility between components '" + oETLComponent2.getName() + "' and '" + oETLComponent.getName() + "'", e);
        }
    }

    protected void init() {
        String str = (String) this.context.getVariable("log");
        if (str != null) {
            this.logLevel = LOG_LEVELS.valueOf(str.toUpperCase());
        }
        Object variable = this.context.getVariable("parallel");
        if (variable != null) {
            this.parallel = ((Boolean) variable).booleanValue();
        }
        if (this.parallel) {
            int availableProcessors = Runtime.getRuntime().availableProcessors();
            this.threads = new Thread[availableProcessors];
            for (int i = 0; i < availableProcessors; i++) {
                this.threads[i] = new Thread("OrientDB ETL Pipeline-" + i);
            }
        }
    }
}
