package com.thinkaurelius.titan.graphdb.log;

import atlas.shaded.titan.guava.common.base.Function;
import atlas.shaded.titan.guava.common.base.Preconditions;
import atlas.shaded.titan.guava.common.collect.Iterables;
import com.thinkaurelius.titan.core.TitanException;
import com.thinkaurelius.titan.core.attribute.Timestamp;
import com.thinkaurelius.titan.core.log.Change;
import com.thinkaurelius.titan.core.log.ChangeProcessor;
import com.thinkaurelius.titan.core.log.LogProcessorBuilder;
import com.thinkaurelius.titan.core.log.LogProcessorFramework;
import com.thinkaurelius.titan.core.schema.TitanSchemaElement;
import com.thinkaurelius.titan.diskstorage.BackendException;
import com.thinkaurelius.titan.diskstorage.ReadBuffer;
import com.thinkaurelius.titan.diskstorage.log.Log;
import com.thinkaurelius.titan.diskstorage.log.Message;
import com.thinkaurelius.titan.diskstorage.log.MessageReader;
import com.thinkaurelius.titan.diskstorage.log.ReadMarker;
import com.thinkaurelius.titan.diskstorage.util.time.TimestampProvider;
import com.thinkaurelius.titan.graphdb.database.StandardTitanGraph;
import com.thinkaurelius.titan.graphdb.database.log.LogTxMeta;
import com.thinkaurelius.titan.graphdb.database.log.TransactionLogHeader;
import com.thinkaurelius.titan.graphdb.database.serialize.Serializer;
import com.thinkaurelius.titan.graphdb.internal.ElementLifeCycle;
import com.thinkaurelius.titan.graphdb.internal.InternalRelation;
import com.thinkaurelius.titan.graphdb.transaction.StandardTitanTx;
import com.thinkaurelius.titan.graphdb.types.system.BaseKey;
import com.thinkaurelius.titan.graphdb.vertices.StandardVertex;
import com.tinkerpop.blueprints.TransactionalGraph;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/thinkaurelius/titan/graphdb/log/StandardLogProcessorFramework.class */
public class StandardLogProcessorFramework implements LogProcessorFramework {
    private static final Logger logger = LoggerFactory.getLogger(StandardLogProcessorFramework.class);
    private final StandardTitanGraph graph;
    private final Serializer serializer;
    private final TimestampProvider times;
    private final Map<String, Log> processorLogs;
    private boolean isOpen = true;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/thinkaurelius/titan/graphdb/log/StandardLogProcessorFramework$Builder.class */
    public class Builder implements LogProcessorBuilder {
        private final String userLogName;
        private final List<ChangeProcessor> processors;
        private String readMarkerName;
        private Timestamp startTime;
        private int retryAttempts;

        private Builder(String str) {
            this.readMarkerName = null;
            this.startTime = null;
            this.retryAttempts = 1;
            Preconditions.checkArgument(StringUtils.isNotBlank(str));
            this.userLogName = str;
            this.processors = new ArrayList();
        }

        @Override // com.thinkaurelius.titan.core.log.LogProcessorBuilder
        public String getLogIdentifier() {
            return this.userLogName;
        }

        @Override // com.thinkaurelius.titan.core.log.LogProcessorBuilder
        public LogProcessorBuilder setProcessorIdentifier(String str) {
            Preconditions.checkArgument(StringUtils.isNotBlank(str));
            this.readMarkerName = str;
            return this;
        }

        @Override // com.thinkaurelius.titan.core.log.LogProcessorBuilder
        public LogProcessorBuilder setStartTime(long j, TimeUnit timeUnit) {
            this.startTime = new Timestamp(j, timeUnit);
            return this;
        }

        @Override // com.thinkaurelius.titan.core.log.LogProcessorBuilder
        public LogProcessorBuilder setStartTimeNow() {
            this.startTime = null;
            return this;
        }

        @Override // com.thinkaurelius.titan.core.log.LogProcessorBuilder
        public LogProcessorBuilder addProcessor(ChangeProcessor changeProcessor) {
            Preconditions.checkArgument(changeProcessor != null);
            this.processors.add(changeProcessor);
            return this;
        }

        @Override // com.thinkaurelius.titan.core.log.LogProcessorBuilder
        public LogProcessorBuilder setRetryAttempts(int i) {
            Preconditions.checkArgument(i > 0, "Invalid number: %s", Integer.valueOf(i));
            this.retryAttempts = i;
            return this;
        }

        @Override // com.thinkaurelius.titan.core.log.LogProcessorBuilder
        public void build() {
            Preconditions.checkArgument(!this.processors.isEmpty(), "Must add at least one processor");
            ReadMarker fromNow = (this.startTime == null && this.readMarkerName == null) ? ReadMarker.fromNow() : this.readMarkerName == null ? ReadMarker.fromTime(this.startTime.sinceEpoch(this.startTime.getNativeUnit()), this.startTime.getNativeUnit()) : this.startTime == null ? ReadMarker.fromIdentifierOrNow(this.readMarkerName) : ReadMarker.fromIdentifierOrTime(this.readMarkerName, this.startTime.sinceEpoch(this.startTime.getNativeUnit()), this.startTime.getNativeUnit());
            synchronized (StandardLogProcessorFramework.this) {
                Preconditions.checkArgument(!StandardLogProcessorFramework.this.processorLogs.containsKey(this.userLogName), "Processors have already been registered for user log: %s", this.userLogName);
                try {
                    StandardLogProcessorFramework.this.graph.getBackend().getUserLog(this.userLogName).registerReaders(fromNow, Iterables.transform(this.processors, new Function<ChangeProcessor, MessageReader>() { // from class: com.thinkaurelius.titan.graphdb.log.StandardLogProcessorFramework.Builder.1
                        @Override // atlas.shaded.titan.guava.common.base.Function
                        @Nullable
                        public MessageReader apply(@Nullable ChangeProcessor changeProcessor) {
                            return new MsgReaderConverter(Builder.this.userLogName, changeProcessor, Builder.this.retryAttempts);
                        }
                    }));
                } catch (BackendException e) {
                    throw new TitanException("Could not open user transaction log for name: " + this.userLogName, e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/thinkaurelius/titan/graphdb/log/StandardLogProcessorFramework$MsgReaderConverter.class */
    public class MsgReaderConverter implements MessageReader {
        private final String userlogName;
        private final ChangeProcessor processor;
        private final int retryAttempts;
        static final /* synthetic */ boolean $assertionsDisabled;

        private MsgReaderConverter(String str, ChangeProcessor changeProcessor, int i) {
            this.userlogName = str;
            this.processor = changeProcessor;
            this.retryAttempts = i;
        }

        private void readRelations(TransactionLogHeader.Entry entry, StandardTitanTx standardTitanTx, StandardChangeState standardChangeState) {
            for (TransactionLogHeader.Modification modification : entry.getContentAsModifications(StandardLogProcessorFramework.this.serializer)) {
                InternalRelation parseRelation = ModificationDeserializer.parseRelation(modification, standardTitanTx);
                Change change = modification.state;
                if (parseRelation.getType().equals(BaseKey.VertexExists) && !(parseRelation.getVertex(0) instanceof TitanSchemaElement)) {
                    if (change == Change.REMOVED) {
                        ((StandardVertex) parseRelation.getVertex(0)).updateLifeCycle(ElementLifeCycle.Event.REMOVED);
                    }
                    standardChangeState.addVertex(parseRelation.getVertex(0), change);
                } else if (!parseRelation.isHidden()) {
                    standardChangeState.addRelation(parseRelation, change);
                }
            }
        }

        @Override // com.thinkaurelius.titan.diskstorage.log.MessageReader
        public void read(Message message) {
            for (int i = 1; i <= this.retryAttempts; i++) {
                StandardTitanTx standardTitanTx = (StandardTitanTx) StandardLogProcessorFramework.this.graph.newTransaction();
                StandardChangeState standardChangeState = new StandardChangeState();
                try {
                    ReadBuffer asReadBuffer = message.getContent().asReadBuffer();
                    String senderId = message.getSenderId();
                    TransactionLogHeader.Entry parse = TransactionLogHeader.parse(asReadBuffer, StandardLogProcessorFramework.this.serializer, StandardLogProcessorFramework.this.times);
                    StandardTransactionId standardTransactionId = parse.getMetadata().containsKey(LogTxMeta.SOURCE_TRANSACTION) ? (StandardTransactionId) parse.getMetadata().get(LogTxMeta.SOURCE_TRANSACTION) : new StandardTransactionId(senderId, parse.getHeader().getId(), new Timestamp(parse.getHeader().getTimestamp(StandardLogProcessorFramework.this.times.getUnit()), StandardLogProcessorFramework.this.times.getUnit()));
                    readRelations(parse, standardTitanTx, standardChangeState);
                    if (!$assertionsDisabled && standardTransactionId == null) {
                        throw new AssertionError();
                    }
                    try {
                        try {
                            this.processor.process(standardTitanTx, standardTransactionId, standardChangeState);
                            if (standardTitanTx != null) {
                                return;
                            } else {
                                return;
                            }
                        } catch (Throwable th) {
                            standardTitanTx.rollback();
                            TransactionalGraph transactionalGraph = null;
                            StandardLogProcessorFramework.logger.error("Encountered exception [{}] when running processor [{}] for user log [{}] on attempt {} of {}", new Object[]{th.getMessage(), this.processor, this.userlogName, Integer.valueOf(i), Integer.valueOf(this.retryAttempts)});
                            StandardLogProcessorFramework.logger.error("Full exception: ", th);
                            if (0 != 0) {
                                transactionalGraph.commit();
                            }
                        }
                    } finally {
                        if (standardTitanTx != null) {
                            standardTitanTx.commit();
                        }
                    }
                } catch (Throwable th2) {
                    standardTitanTx.rollback();
                    StandardLogProcessorFramework.logger.error("Encountered exception [{}] when preparing processor [{}] for user log [{}] on attempt {} of {}", new Object[]{th2.getMessage(), this.processor, this.userlogName, Integer.valueOf(i), Integer.valueOf(this.retryAttempts)});
                    StandardLogProcessorFramework.logger.error("Full exception: ", th2);
                }
            }
        }

        static {
            $assertionsDisabled = !StandardLogProcessorFramework.class.desiredAssertionStatus();
        }
    }

    public StandardLogProcessorFramework(StandardTitanGraph standardTitanGraph) {
        Preconditions.checkArgument(standardTitanGraph != null && standardTitanGraph.isOpen());
        this.graph = standardTitanGraph;
        this.serializer = standardTitanGraph.getDataSerializer();
        this.times = standardTitanGraph.getConfiguration().getTimestampProvider();
        this.processorLogs = new HashMap();
    }

    private void checkOpen() {
        Preconditions.checkState(this.isOpen, "Transaction log framework has already been closed");
    }

    @Override // com.thinkaurelius.titan.core.log.LogProcessorFramework
    public synchronized boolean removeLogProcessor(String str) {
        checkOpen();
        if (!this.processorLogs.containsKey(str)) {
            return false;
        }
        try {
            this.processorLogs.get(str).close();
            this.processorLogs.remove(str);
            return true;
        } catch (BackendException e) {
            throw new TitanException("Could not close transaction log: " + str, e);
        }
    }

    @Override // com.thinkaurelius.titan.core.log.LogProcessorFramework
    public synchronized void shutdown() throws TitanException {
        if (this.isOpen) {
            this.isOpen = false;
            try {
                Iterator<Log> it = this.processorLogs.values().iterator();
                while (it.hasNext()) {
                    it.next().close();
                }
                this.processorLogs.clear();
            } catch (BackendException e) {
                throw new TitanException(e);
            }
        }
    }

    @Override // com.thinkaurelius.titan.core.log.LogProcessorFramework
    public LogProcessorBuilder addLogProcessor(String str) {
        return new Builder(str);
    }
}
