package com.datatorrent.contrib.couchbase;

import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
import com.datatorrent.lib.db.AbstractAggregateTransactionableStoreOutputOperator;
import com.datatorrent.netlet.util.DTThrowable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.TreeMap;
import net.spy.memcached.internal.OperationCompletionListener;
import net.spy.memcached.internal.OperationFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/couchbase/AbstractCouchBaseOutputOperator.class */
public abstract class AbstractCouchBaseOutputOperator<T> extends AbstractAggregateTransactionableStoreOutputOperator<T, CouchBaseWindowStore> {
    private static final transient Logger logger = LoggerFactory.getLogger(AbstractCouchBaseOutputOperator.class);
    protected int numTuples;
    protected CouchBaseSerializer serializer;
    private final transient AbstractCouchBaseOutputOperator<T>.CompletionListener listener;
    private transient boolean failure;
    private final transient Object syncObj;
    protected long id = 0;
    protected transient HashMap<OperationFuture<Boolean>, Long> mapFuture = new HashMap<>();
    protected TreeMap<Long, T> mapTuples = new TreeMap<>();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/datatorrent/contrib/couchbase/AbstractCouchBaseOutputOperator$CompletionListener.class */
    public class CompletionListener implements OperationCompletionListener {
        protected CompletionListener() {
        }

        public void onComplete(OperationFuture<?> operationFuture) throws Exception {
            if (!((Boolean) operationFuture.get()).booleanValue()) {
                AbstractCouchBaseOutputOperator.logger.error("Operation failed {}", operationFuture);
                AbstractCouchBaseOutputOperator.this.failure = true;
                return;
            }
            synchronized (AbstractCouchBaseOutputOperator.this.syncObj) {
                AbstractCouchBaseOutputOperator.this.mapTuples.remove(Long.valueOf(AbstractCouchBaseOutputOperator.this.mapFuture.get(operationFuture).longValue()));
                AbstractCouchBaseOutputOperator.this.mapFuture.remove(operationFuture);
                AbstractCouchBaseOutputOperator.this.numTuples--;
                AbstractCouchBaseOutputOperator.this.syncObj.notify();
            }
        }
    }

    public AbstractCouchBaseOutputOperator() {
        this.store = new CouchBaseWindowStore();
        this.listener = new CompletionListener();
        this.numTuples = 0;
        this.syncObj = new Object();
    }

    public void setup(Context.OperatorContext operatorContext) {
        if (((Operator.ProcessingMode) operatorContext.getValue(Context.OperatorContext.PROCESSING_MODE)) == Operator.ProcessingMode.AT_MOST_ONCE && this.numTuples == 0) {
            this.mapTuples.clear();
        }
        this.numTuples = 0;
        if (!this.mapTuples.isEmpty()) {
            Iterator<T> it = this.mapTuples.values().iterator();
            while (it.hasNext()) {
                processTuple(it.next());
            }
        }
        super.setup(operatorContext);
    }

    public void beginWindow(long j) {
        this.numTuples = 0;
        super.beginWindow(j);
    }

    public void processTuple(T t) {
        if (this.failure) {
            throw new RuntimeException("Operation Failed");
        }
        waitForQueueSize(((CouchBaseWindowStore) this.store).queueSize.intValue() - 1);
        setKeyValueInCouchBase(t);
    }

    public void setKeyValueInCouchBase(T t) {
        this.id++;
        String key = getKey(t);
        Object value = getValue(t);
        if (!(value instanceof Boolean) && !(value instanceof Integer) && !(value instanceof String) && !(value instanceof Float) && !(value instanceof Double) && !(value instanceof Character) && !(value instanceof Long) && !(value instanceof Short) && !(value instanceof Byte) && this.serializer != null) {
            value = this.serializer.serialize(value);
        }
        OperationFuture<Boolean> processKeyValue = processKeyValue(key, value);
        synchronized (this.syncObj) {
            processKeyValue.addListener(this.listener);
            this.mapFuture.put(processKeyValue, Long.valueOf(this.id));
            if (!this.mapTuples.containsKey(Long.valueOf(this.id))) {
                this.mapTuples.put(Long.valueOf(this.id), t);
            }
            this.numTuples++;
        }
    }

    public void storeAggregate() {
        waitForQueueSize(0);
        this.id = 0L;
    }

    public void waitForQueueSize(int i) {
        long currentTimeMillis = System.currentTimeMillis();
        while (this.numTuples > i) {
            synchronized (this.syncObj) {
                if (this.numTuples > i) {
                    try {
                        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                        if (currentTimeMillis2 >= ((CouchBaseWindowStore) this.store).timeout) {
                            throw new RuntimeException("Timed out waiting for space in queue");
                            break;
                        }
                        this.syncObj.wait(((CouchBaseWindowStore) this.store).timeout - currentTimeMillis2);
                    } catch (InterruptedException e) {
                        DTThrowable.rethrow(e);
                    }
                }
            }
            if (System.currentTimeMillis() - currentTimeMillis >= ((CouchBaseWindowStore) this.store).timeout) {
                throw new RuntimeException("Timed out waiting for space in queue");
            }
        }
    }

    public CouchBaseSerializer getSerializer() {
        return this.serializer;
    }

    public void setSerializer(CouchBaseSerializer couchBaseSerializer) {
        this.serializer = couchBaseSerializer;
    }

    public abstract String getKey(T t);

    public abstract Object getValue(T t);

    protected abstract OperationFuture<Boolean> processKeyValue(String str, Object obj);
}
