package org.apache.rya.accumulo.mr;

import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.log4j.Logger;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.AccumuloRdfConstants;
import org.apache.rya.accumulo.AccumuloRyaDAO;
import org.apache.rya.api.RdfCloudTripleStoreConstants;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaType;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.api.resolver.RdfToRyaConversions;
import org.apache.rya.api.resolver.RyaTripleContext;
import org.apache.rya.indexing.FreeTextIndexer;
import org.apache.rya.indexing.TemporalIndexer;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex;
import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
import org.openrdf.model.Statement;
import org.openrdf.model.vocabulary.XMLSchema;

/* loaded from: input_file:org/apache/rya/accumulo/mr/RyaOutputFormat.class */
public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable> {
    private static final Logger logger = Logger.getLogger(RyaOutputFormat.class);
    private static final String PREFIX = RyaOutputFormat.class.getSimpleName();
    private static final String MAX_MUTATION_BUFFER_SIZE = PREFIX + ".maxmemory";
    private static final String ENABLE_FREETEXT = PREFIX + ".freetext.enable";
    private static final String ENABLE_TEMPORAL = PREFIX + ".temporal.enable";
    private static final String ENABLE_ENTITY = PREFIX + ".entity.enable";
    private static final String ENABLE_CORE = PREFIX + ".coretables.enable";
    private static final String OUTPUT_PREFIX_PROPERTY = PREFIX + ".tablePrefix";
    private static final String CV_PROPERTY = PREFIX + ".cv.default";
    private static final String CONTEXT_PROPERTY = PREFIX + ".context";

    /* loaded from: input_file:org/apache/rya/accumulo/mr/RyaOutputFormat$RyaRecordWriter.class */
    public static class RyaRecordWriter extends RecordWriter<Writable, RyaStatementWritable> implements Closeable, Flushable {
        private static final Logger logger = Logger.getLogger(RyaRecordWriter.class);
        private FreeTextIndexer freeTextIndexer;
        private TemporalIndexer temporalIndexer;
        private EntityCentricIndex entityIndexer;
        private AccumuloRyaDAO ryaIndexer;
        private RyaTripleContext tripleContext;
        private MultiTableBatchWriter writer;
        private byte[] cv;
        private RyaURI defaultContext;
        private static final long ONE_MEGABYTE = 1048576;
        private static final long AVE_STATEMENT_SIZE = 100;
        private long bufferSizeLimit;
        private long bufferCurrentSize;
        private ArrayList<RyaStatement> buffer;
        private long startTime;
        private long lastCommitFinishTime;
        private long totalCommitRecords;
        private double totalReadDuration;
        private double totalWriteDuration;
        private long commitCount;

        public RyaRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {
            this(taskAttemptContext.getConfiguration());
        }

        public RyaRecordWriter(Configuration configuration) throws IOException {
            this.cv = AccumuloRdfConstants.EMPTY_CV.getExpression();
            this.defaultContext = null;
            this.bufferCurrentSize = 0L;
            this.startTime = 0L;
            this.lastCommitFinishTime = 0L;
            this.totalCommitRecords = 0L;
            this.totalReadDuration = 0.0d;
            this.totalWriteDuration = 0.0d;
            this.commitCount = 0L;
            String str = configuration.get(RyaOutputFormat.CV_PROPERTY);
            if (str != null) {
                this.cv = str.getBytes();
            }
            String str2 = configuration.get(RyaOutputFormat.CONTEXT_PROPERTY, "");
            if (str2 != null && !str2.isEmpty()) {
                this.defaultContext = new RyaURI(str2);
            }
            this.bufferSizeLimit = configuration.getLong(RyaOutputFormat.MAX_MUTATION_BUFFER_SIZE, ONE_MEGABYTE);
            this.buffer = new ArrayList<>((int) (this.bufferSizeLimit / AVE_STATEMENT_SIZE));
            this.freeTextIndexer = RyaOutputFormat.getFreeTextIndexer(configuration);
            this.temporalIndexer = RyaOutputFormat.getTemporalIndexer(configuration);
            this.entityIndexer = RyaOutputFormat.getEntityIndexer(configuration);
            this.ryaIndexer = RyaOutputFormat.getRyaIndexer(configuration);
            if (this.entityIndexer != null) {
                try {
                    Connector connector = ConfigUtils.getConnector(configuration);
                    BatchWriterConfig batchWriterConfig = new BatchWriterConfig();
                    batchWriterConfig.setMaxMemory(RdfCloudTripleStoreConstants.MAX_MEMORY.longValue());
                    batchWriterConfig.setTimeout(RdfCloudTripleStoreConstants.MAX_TIME.longValue(), TimeUnit.MILLISECONDS);
                    batchWriterConfig.setMaxWriteThreads(RdfCloudTripleStoreConstants.NUM_THREADS.intValue());
                    this.writer = connector.createMultiTableBatchWriter(batchWriterConfig);
                    this.entityIndexer.setMultiTableBatchWriter(this.writer);
                } catch (AccumuloException | AccumuloSecurityException e) {
                    throw new IOException("Error connecting to Accumulo for entity index output", e);
                }
            }
            this.startTime = System.currentTimeMillis();
            this.lastCommitFinishTime = this.startTime;
            this.tripleContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(configuration));
        }

        @Override // java.io.Flushable
        public void flush() throws IOException {
            flushBuffer();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            close(null);
        }

        public void close(TaskAttemptContext taskAttemptContext) {
            try {
                flush();
            } catch (IOException e) {
                logger.error("Error flushing the buffer on RyaOutputFormat Close", e);
            }
            try {
                if (this.freeTextIndexer != null) {
                    this.freeTextIndexer.close();
                }
            } catch (IOException e2) {
                logger.error("Error closing the freetextIndexer on RyaOutputFormat Close", e2);
            }
            try {
                if (this.temporalIndexer != null) {
                    this.temporalIndexer.close();
                }
            } catch (IOException e3) {
                logger.error("Error closing the temporalIndexer on RyaOutputFormat Close", e3);
            }
            try {
                if (this.entityIndexer != null) {
                    this.entityIndexer.close();
                }
            } catch (IOException e4) {
                logger.error("Error closing the entityIndexer on RyaOutputFormat Close", e4);
            }
            try {
                if (this.ryaIndexer != null) {
                    this.ryaIndexer.destroy();
                }
            } catch (RyaDAOException e5) {
                logger.error("Error closing RyaDAO on RyaOutputFormat Close", e5);
            }
            if (this.writer != null) {
                try {
                    this.writer.close();
                } catch (MutationsRejectedException e6) {
                    logger.error("Error closing MultiTableBatchWriter on RyaOutputFormat Close", e6);
                }
            }
        }

        public void write(Statement statement) throws IOException {
            write(RdfToRyaConversions.convertStatement(statement));
        }

        public void write(RyaStatement ryaStatement) throws IOException {
            write((Writable) NullWritable.get(), new RyaStatementWritable(ryaStatement, this.tripleContext));
        }

        public void write(Writable writable, RyaStatementWritable ryaStatementWritable) throws IOException {
            RyaStatement ryaStatement = ryaStatementWritable.getRyaStatement();
            if (ryaStatement.getColumnVisibility() == null) {
                ryaStatement.setColumnVisibility(this.cv);
            }
            if (ryaStatement.getContext() == null) {
                ryaStatement.setContext(this.defaultContext);
            }
            this.buffer.add(ryaStatement);
            this.bufferCurrentSize += statementSize(ryaStatement);
            if (this.bufferCurrentSize >= this.bufferSizeLimit) {
                flushBuffer();
            }
        }

        private int statementSize(RyaStatement ryaStatement) {
            RyaURI subject = ryaStatement.getSubject();
            RyaURI predicate = ryaStatement.getPredicate();
            RyaType object = ryaStatement.getObject();
            RyaURI context = ryaStatement.getContext();
            int length = 3 + subject.getData().length() + predicate.getData().length() + object.getData().length();
            if (!XMLSchema.ANYURI.equals(object.getDataType())) {
                length += 2 + object.getDataType().toString().length();
            }
            if (context != null) {
                length += context.getData().length();
            }
            return length;
        }

        private void flushBuffer() throws IOException {
            this.totalCommitRecords += this.buffer.size();
            this.commitCount++;
            long currentTimeMillis = System.currentTimeMillis();
            logger.info(String.format("(C-%d) Flushing buffer with %,d objects and %,d bytes", Long.valueOf(this.commitCount), Integer.valueOf(this.buffer.size()), Long.valueOf(this.bufferCurrentSize)));
            double d = (currentTimeMillis - this.lastCommitFinishTime) / 1000.0d;
            this.totalReadDuration += d;
            logger.info(String.format("(C-%d) (Reading) Duration, Current Rate, Total Rate: %.2f %.2f %.2f ", Long.valueOf(this.commitCount), Double.valueOf(d), Double.valueOf(this.buffer.size() / d), Double.valueOf(this.totalCommitRecords / this.totalReadDuration)));
            if (this.freeTextIndexer != null) {
                this.freeTextIndexer.storeStatements(this.buffer);
                this.freeTextIndexer.flush();
            }
            if (this.temporalIndexer != null) {
                this.temporalIndexer.storeStatements(this.buffer);
                this.temporalIndexer.flush();
            }
            if (this.entityIndexer != null && this.writer != null) {
                this.entityIndexer.storeStatements(this.buffer);
                try {
                    this.writer.flush();
                } catch (MutationsRejectedException e) {
                    throw new IOException("Error flushing data to Accumulo for entity indexing", e);
                }
            }
            try {
                if (this.ryaIndexer != null) {
                    this.ryaIndexer.add(this.buffer.iterator());
                }
                this.lastCommitFinishTime = System.currentTimeMillis();
                double d2 = (this.lastCommitFinishTime - currentTimeMillis) / 1000.0d;
                this.totalWriteDuration += d2;
                logger.info(String.format("(C-%d) (Writing) Duration, Current Rate, Total Rate: %.2f %.2f %.2f ", Long.valueOf(this.commitCount), Double.valueOf(d2), Double.valueOf(this.buffer.size() / d2), Double.valueOf(this.totalCommitRecords / this.totalWriteDuration)));
                double d3 = d2 + d;
                logger.info(String.format("(C-%d) (Total) Duration, Current Rate, Total Rate: %.2f %.2f %.2f ", Long.valueOf(this.commitCount), Double.valueOf(d3), Double.valueOf(this.buffer.size() / d3), Double.valueOf(this.totalCommitRecords / (this.totalWriteDuration + this.totalReadDuration))));
                this.buffer.clear();
                this.bufferCurrentSize = 0L;
            } catch (RyaDAOException e2) {
                logger.error("Cannot write statement to Rya", e2);
                throw new IOException((Throwable) e2);
            }
        }
    }

    public static void setDefaultVisibility(Job job, String str) {
        if (str != null) {
            job.getConfiguration().set(CV_PROPERTY, str);
        }
    }

    public static void setDefaultContext(Job job, String str) {
        if (str != null) {
            job.getConfiguration().set(CONTEXT_PROPERTY, str);
        }
    }

    public static void setTablePrefix(Job job, String str) {
        job.getConfiguration().set(OUTPUT_PREFIX_PROPERTY, str);
    }

    public static void setFreeTextEnabled(Job job, boolean z) {
        job.getConfiguration().setBoolean(ENABLE_FREETEXT, z);
    }

    public static void setTemporalEnabled(Job job, boolean z) {
        job.getConfiguration().setBoolean(ENABLE_TEMPORAL, z);
    }

    public static void setEntityEnabled(Job job, boolean z) {
        job.getConfiguration().setBoolean(ENABLE_ENTITY, z);
    }

    public static void setCoreTablesEnabled(Job job, boolean z) {
        job.getConfiguration().setBoolean(ENABLE_CORE, z);
    }

    public static void setMockInstance(Job job, String str) {
        AccumuloOutputFormat.setMockInstance(job, str);
        job.getConfiguration().setBoolean(".useMockInstance", true);
        job.getConfiguration().setBoolean(MRUtils.AC_MOCK_PROP, true);
    }

    public void checkOutputSpecs(JobContext jobContext) throws IOException {
        Configuration configuration = jobContext.getConfiguration();
        getFreeTextIndexer(configuration);
        getTemporalIndexer(configuration);
        getRyaIndexer(configuration);
    }

    public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new NullOutputFormat().getOutputCommitter(taskAttemptContext);
    }

    public RecordWriter<Writable, RyaStatementWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {
        return new RyaRecordWriter(taskAttemptContext);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static FreeTextIndexer getFreeTextIndexer(Configuration configuration) {
        if (!configuration.getBoolean(ENABLE_FREETEXT, true)) {
            return null;
        }
        AccumuloFreeTextIndexer accumuloFreeTextIndexer = new AccumuloFreeTextIndexer();
        accumuloFreeTextIndexer.setConf(configuration);
        return accumuloFreeTextIndexer;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static TemporalIndexer getTemporalIndexer(Configuration configuration) {
        if (!configuration.getBoolean(ENABLE_TEMPORAL, true)) {
            return null;
        }
        AccumuloTemporalIndexer accumuloTemporalIndexer = new AccumuloTemporalIndexer();
        accumuloTemporalIndexer.setConf(configuration);
        return accumuloTemporalIndexer;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static EntityCentricIndex getEntityIndexer(Configuration configuration) {
        if (!configuration.getBoolean(ENABLE_ENTITY, true)) {
            return null;
        }
        EntityCentricIndex entityCentricIndex = new EntityCentricIndex();
        entityCentricIndex.setConf(configuration);
        return entityCentricIndex;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static AccumuloRyaDAO getRyaIndexer(Configuration configuration) throws IOException {
        try {
            if (!configuration.getBoolean(ENABLE_CORE, true)) {
                return null;
            }
            AccumuloRyaDAO accumuloRyaDAO = new AccumuloRyaDAO();
            accumuloRyaDAO.setConnector(ConfigUtils.getConnector(configuration));
            AccumuloRdfConfiguration accumuloRdfConfiguration = new AccumuloRdfConfiguration();
            String str = configuration.get(OUTPUT_PREFIX_PROPERTY, (String) null);
            if (str != null) {
                accumuloRdfConfiguration.setTablePrefix(str);
            }
            accumuloRdfConfiguration.setDisplayQueryPlan(false);
            accumuloRyaDAO.setConf(accumuloRdfConfiguration);
            accumuloRyaDAO.init();
            return accumuloRyaDAO;
        } catch (AccumuloException e) {
            logger.error("Cannot create RyaIndexer", e);
            throw new IOException((Throwable) e);
        } catch (AccumuloSecurityException e2) {
            logger.error("Cannot create RyaIndexer", e2);
            throw new IOException((Throwable) e2);
        } catch (RyaDAOException e3) {
            logger.error("Cannot create RyaIndexer", e3);
            throw new IOException((Throwable) e3);
        }
    }
}
