package gobblin.couchbase.writer;

import com.couchbase.client.core.lang.Tuple;
import com.couchbase.client.core.lang.Tuple2;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.kv.MutationToken;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.document.AbstractDocument;
import com.couchbase.client.java.document.RawJsonDocument;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.transcoder.Transcoder;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import gobblin.couchbase.common.TupleDocument;
import gobblin.util.ConfigUtils;
import gobblin.writer.AsyncDataWriter;
import gobblin.writer.GenericWriteResponse;
import gobblin.writer.GenericWriteResponseWrapper;
import gobblin.writer.SyncDataWriter;
import gobblin.writer.WriteCallback;
import gobblin.writer.WriteResponse;
import gobblin.writer.WriteResponseFuture;
import gobblin.writer.WriteResponseMapper;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.math3.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;

/* loaded from: input_file:gobblin/couchbase/writer/CouchbaseWriter.class */
public class CouchbaseWriter<D extends AbstractDocument> implements AsyncDataWriter<D>, SyncDataWriter<D> {
    private static final Logger log = LoggerFactory.getLogger(CouchbaseWriter.class);
    private final Cluster _cluster;
    private final Bucket _bucket;
    private final long _operationTimeout;
    private final TimeUnit _operationTimeunit;
    private final WriteResponseMapper<D> _defaultWriteResponseMapper;
    private final Transcoder<TupleDocument, Tuple2<ByteBuf, Integer>> _tupleDocumentTranscoder = new Transcoder<TupleDocument, Tuple2<ByteBuf, Integer>>() { // from class: gobblin.couchbase.writer.CouchbaseWriter.1
        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public TupleDocument m3decode(String str, ByteBuf byteBuf, long j, int i, int i2, ResponseStatus responseStatus) {
            return newDocument(str, i, Tuple.create(byteBuf, Integer.valueOf(i2)), j);
        }

        public Tuple2<ByteBuf, Integer> encode(TupleDocument tupleDocument) {
            return (Tuple2) tupleDocument.content();
        }

        public TupleDocument newDocument(String str, int i, Tuple2<ByteBuf, Integer> tuple2, long j) {
            return new TupleDocument(str, i, tuple2, j);
        }

        public TupleDocument newDocument(String str, int i, Tuple2<ByteBuf, Integer> tuple2, long j, MutationToken mutationToken) {
            return new TupleDocument(str, i, tuple2, j);
        }

        public Class<TupleDocument> documentType() {
            return TupleDocument.class;
        }
    };

    public CouchbaseWriter(CouchbaseEnvironment couchbaseEnvironment, Config config) {
        List stringList = ConfigUtils.getStringList(config, CouchbaseWriterConfigurationKeys.BOOTSTRAP_SERVERS);
        this._cluster = CouchbaseCluster.create(couchbaseEnvironment, stringList);
        String string = ConfigUtils.getString(config, CouchbaseWriterConfigurationKeys.BUCKET, CouchbaseWriterConfigurationKeys.BUCKET_DEFAULT);
        this._bucket = this._cluster.openBucket(string, ConfigUtils.getString(config, CouchbaseWriterConfigurationKeys.PASSWORD, ""), Collections.singletonList(this._tupleDocumentTranscoder));
        this._operationTimeout = ConfigUtils.getLong(config, CouchbaseWriterConfigurationKeys.OPERATION_TIMEOUT_MILLIS, Long.valueOf(CouchbaseWriterConfigurationKeys.OPERATION_TIMEOUT_DEFAULT)).longValue();
        this._operationTimeunit = TimeUnit.MILLISECONDS;
        this._defaultWriteResponseMapper = new GenericWriteResponseWrapper();
        log.info("Couchbase writer configured with: hosts: {}, bucketName: {}, operationTimeoutInMillis: {}", new Object[]{stringList, string, Long.valueOf(this._operationTimeout)});
    }

    @VisibleForTesting
    Bucket getBucket() {
        return this._bucket;
    }

    private void assertRecordWritable(D d) {
        Preconditions.checkArgument((d instanceof TupleDocument) || (d instanceof RawJsonDocument), "This writer only supports TupleDocument or RawJsonDocument. Found " + d.getClass().getName());
    }

    public Future<WriteResponse> write(D d, final WriteCallback writeCallback) {
        assertRecordWritable(d);
        if (d instanceof TupleDocument) {
            ((ByteBuf) ((Tuple2) ((TupleDocument) d).content()).value1()).retain();
        }
        Observable upsert = this._bucket.async().upsert(d);
        if (writeCallback == null) {
            return new WriteResponseFuture(upsert.timeout(this._operationTimeout, this._operationTimeunit).toBlocking().toFuture(), this._defaultWriteResponseMapper);
        }
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
        Future<WriteResponse> future = new Future<WriteResponse>() { // from class: gobblin.couchbase.writer.CouchbaseWriter.2
            @Override // java.util.concurrent.Future
            public boolean cancel(boolean z) {
                return false;
            }

            @Override // java.util.concurrent.Future
            public boolean isCancelled() {
                return false;
            }

            @Override // java.util.concurrent.Future
            public boolean isDone() {
                return atomicBoolean.get();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Future
            public WriteResponse get() throws InterruptedException, ExecutionException {
                return CouchbaseWriter.this.getWriteResponseorThrow((Pair) arrayBlockingQueue.take());
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Future
            public WriteResponse get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
                Pair pair = (Pair) arrayBlockingQueue.poll(j, timeUnit);
                if (pair == null) {
                    throw new TimeoutException("Timeout exceeded while waiting for future to be done");
                }
                return CouchbaseWriter.this.getWriteResponseorThrow(pair);
            }
        };
        upsert.timeout(this._operationTimeout, this._operationTimeunit).subscribe(new Subscriber<D>() { // from class: gobblin.couchbase.writer.CouchbaseWriter.3
            public void onCompleted() {
            }

            public void onError(Throwable th) {
                atomicBoolean.set(true);
                arrayBlockingQueue.add(new Pair((Object) null, th));
                writeCallback.onFailure(th);
            }

            public void onNext(D d2) {
                try {
                    atomicBoolean.set(true);
                    GenericWriteResponse genericWriteResponse = new GenericWriteResponse(d2);
                    arrayBlockingQueue.add(new Pair(genericWriteResponse, (Object) null));
                    writeCallback.onSuccess(genericWriteResponse);
                } finally {
                    if (d2 instanceof TupleDocument) {
                        ((ByteBuf) ((Tuple2) ((TupleDocument) d2).content()).value1()).release();
                    }
                }
            }
        });
        return future;
    }

    public void flush() throws IOException {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public WriteResponse getWriteResponseorThrow(Pair<WriteResponse, Throwable> pair) throws ExecutionException {
        if (pair.getFirst() != null) {
            return (WriteResponse) pair.getFirst();
        }
        if (pair.getSecond() != null) {
            throw new ExecutionException((Throwable) pair.getSecond());
        }
        throw new ExecutionException(new RuntimeException("Could not find non-null WriteResponse pair"));
    }

    public void cleanup() throws IOException {
    }

    public WriteResponse write(D d) throws IOException {
        try {
            return new GenericWriteResponse(this._bucket.upsert(d));
        } catch (Exception e) {
            throw new IOException("Failed to write to Couchbase cluster", e);
        }
    }

    public void close() {
        if (!this._bucket.isClosed()) {
            try {
                this._bucket.close();
            } catch (Exception e) {
                log.warn("Failed to close bucket", e);
            }
        }
        try {
            this._cluster.disconnect();
        } catch (Exception e2) {
            log.warn("Failed to disconnect from cluster", e2);
        }
    }
}
