/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.table.remote.couchbase;

import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.util.ReferenceCountUtil;
import com.couchbase.client.java.document.BinaryDocument;
import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.google.common.base.Preconditions;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
import org.apache.samza.table.AsyncReadWriteUpdateTable;
import org.apache.samza.table.remote.TableReadFunction;
import org.apache.samza.table.remote.couchbase.BaseCouchbaseTableFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Single;
import rx.SingleSubscriber;

public class CouchbaseTableReadFunction<V>
extends BaseCouchbaseTableFunction<V>
implements TableReadFunction<String, V> {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseTableReadFunction.class);
    protected final Class<? extends Document<?>> documentType;

    public CouchbaseTableReadFunction(String bucketName, Class<V> valueClass, String ... clusterNodes) {
        super(bucketName, valueClass, clusterNodes);
        this.documentType = JsonObject.class.isAssignableFrom(valueClass) ? JsonDocument.class : BinaryDocument.class;
    }

    @Override
    public void init(Context context, AsyncReadWriteUpdateTable table) {
        super.init(context, table);
        LOGGER.info("Read function for bucket {} initialized successfully", (Object)this.bucketName);
    }

    public CompletableFuture<V> getAsync(final String key) {
        Preconditions.checkArgument((boolean)StringUtils.isNotBlank((CharSequence)key), (Object)"key must not be null, empty or blank");
        final CompletableFuture future = new CompletableFuture();
        Single singleObservable = this.bucket.async().get(key, this.documentType, this.timeout.toMillis(), TimeUnit.MILLISECONDS).toSingle();
        singleObservable.subscribe(new SingleSubscriber<Document<?>>(){

            public void onSuccess(Document<?> document) {
                if (document != null) {
                    if (document instanceof BinaryDocument) {
                        CouchbaseTableReadFunction.this.handleGetAsyncBinaryDocument((BinaryDocument)document, future, key);
                    } else {
                        future.complete(document.content());
                    }
                } else {
                    future.completeExceptionally((Throwable)new SamzaException(String.format("Got unexpected null value from key %s", key)));
                }
            }

            public void onError(Throwable throwable) {
                if (throwable instanceof NoSuchElementException) {
                    future.complete(null);
                } else {
                    future.completeExceptionally((Throwable)new SamzaException(String.format("Failed to get key %s", key), throwable));
                }
            }
        });
        return future;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleGetAsyncBinaryDocument(BinaryDocument binaryDocument, CompletableFuture<V> future, String key) {
        ByteBuf buffer = (ByteBuf)binaryDocument.content();
        try {
            byte[] bytes;
            if (buffer.hasArray() && buffer.arrayOffset() == 0 && buffer.readableBytes() == buffer.array().length) {
                bytes = buffer.array();
            } else {
                bytes = new byte[buffer.readableBytes()];
                buffer.readBytes(bytes);
            }
            future.complete(this.valueSerde.fromBytes(bytes));
        }
        catch (Exception e) {
            future.completeExceptionally((Throwable)new SamzaException(String.format("Failed to deserialize value of key %s with given serde", key), (Throwable)e));
        }
        finally {
            ReferenceCountUtil.release((Object)buffer);
        }
    }
}

