/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.table.remote.couchbase;

import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.java.document.BinaryDocument;
import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.google.common.base.Preconditions;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
import org.apache.samza.table.AsyncReadWriteUpdateTable;
import org.apache.samza.table.remote.TableWriteFunction;
import org.apache.samza.table.remote.couchbase.BaseCouchbaseTableFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.SingleSubscriber;

public class CouchbaseTableWriteFunction<V>
extends BaseCouchbaseTableFunction<V>
implements TableWriteFunction<String, V, Object> {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseTableWriteFunction.class);

    public CouchbaseTableWriteFunction(String bucketName, Class<V> valueClass, String ... clusterNodes) {
        super(bucketName, valueClass, clusterNodes);
    }

    @Override
    public void init(Context context, AsyncReadWriteUpdateTable table) {
        super.init(context, table);
        LOGGER.info("Write function for bucket {} initialized successfully", (Object)this.bucketName);
    }

    public CompletableFuture<Void> putAsync(String key, V record) {
        Preconditions.checkArgument((boolean)StringUtils.isNotBlank((CharSequence)key), (Object)"key must not be null, empty or blank");
        Preconditions.checkArgument((!key.contains(" ") ? 1 : 0) != 0, (Object)String.format("key should not contain spaces: %s", key));
        Preconditions.checkNotNull(record);
        JsonDocument document = record instanceof JsonObject ? JsonDocument.create((String)key, (int)((int)this.ttl.getSeconds()), (JsonObject)((JsonObject)record)) : BinaryDocument.create((String)key, (int)((int)this.ttl.getSeconds()), (ByteBuf)Unpooled.copiedBuffer((byte[])this.valueSerde.toBytes(record)));
        return this.asyncWriteHelper((Observable<Document>)this.bucket.async().upsert((Document)document, this.timeout.toMillis(), TimeUnit.MILLISECONDS), String.format("Failed to insert key %s into bucket %s", key, this.bucketName));
    }

    public CompletableFuture<Void> updateAsync(String key, Object updates) {
        throw new SamzaException("Update is unsupported");
    }

    public CompletableFuture<Void> deleteAsync(String key) {
        Preconditions.checkArgument((boolean)StringUtils.isNotBlank((CharSequence)key), (Object)"key must not be null, empty or blank");
        return this.asyncWriteHelper((Observable<Document>)this.bucket.async().remove(key, this.timeout.toMillis(), TimeUnit.MILLISECONDS), String.format("Failed to delete key %s from bucket %s.", key, this.bucketName));
    }

    protected CompletableFuture<Void> asyncWriteHelper(Observable<? extends Document> observable, String errorMessage) {
        return this.asyncWriteHelper(observable, errorMessage, true);
    }

    protected <T> CompletableFuture<T> asyncWriteHelper(Observable<? extends Document> observable, final String errorMessage, final boolean isVoid) {
        final CompletableFuture future = new CompletableFuture();
        observable.toSingle().subscribe((SingleSubscriber)new SingleSubscriber<Document>(){

            public void onSuccess(Document document) {
                if (isVoid) {
                    future.complete(null);
                } else {
                    future.complete(document.content());
                }
            }

            public void onError(Throwable error) {
                future.completeExceptionally((Throwable)new SamzaException(errorMessage, error));
            }
        });
        return future;
    }
}

