package com.datatorrent.contrib.accumulo;

import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
import com.datatorrent.lib.db.AbstractAggregateTransactionableStoreOutputOperator;
import com.datatorrent.netlet.util.DTThrowable;
import com.google.common.collect.Lists;
import java.util.Iterator;
import java.util.List;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.data.Mutation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/accumulo/AbstractAccumuloOutputOperator.class */
public abstract class AbstractAccumuloOutputOperator<T> extends AbstractAggregateTransactionableStoreOutputOperator<T, AccumuloWindowStore> {
    private static final transient Logger logger = LoggerFactory.getLogger(AbstractAccumuloOutputOperator.class);
    private final List<T> tuples = Lists.newArrayList();
    private transient Operator.ProcessingMode mode;

    public Operator.ProcessingMode getMode() {
        return this.mode;
    }

    public void setMode(Operator.ProcessingMode processingMode) {
        this.mode = processingMode;
    }

    public AbstractAccumuloOutputOperator() {
        this.store = new AccumuloWindowStore();
    }

    public void processTuple(T t) {
        this.tuples.add(t);
    }

    public void storeAggregate() {
        try {
            Iterator<T> it = this.tuples.iterator();
            while (it.hasNext()) {
                ((AccumuloWindowStore) this.store).getBatchwriter().addMutation(operationMutation(it.next()));
            }
            ((AccumuloWindowStore) this.store).getBatchwriter().flush();
        } catch (MutationsRejectedException e) {
            logger.error("unable to write mutations", e);
            DTThrowable.rethrow(e);
        }
        this.tuples.clear();
    }

    public abstract Mutation operationMutation(T t);

    public void setup(Context.OperatorContext operatorContext) {
        this.mode = (Operator.ProcessingMode) operatorContext.getValue(Context.OperatorContext.PROCESSING_MODE);
        if (this.mode == Operator.ProcessingMode.EXACTLY_ONCE) {
            throw new RuntimeException("This operator only supports atmost once and atleast once processing modes");
        }
        if (this.mode == Operator.ProcessingMode.AT_MOST_ONCE) {
            this.tuples.clear();
        }
        super.setup(operatorContext);
    }
}
