package org.apache.gobblin.couchbase.writer;

import com.couchbase.client.core.lang.Tuple;
import com.couchbase.client.core.lang.Tuple2;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.kv.MutationToken;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.auth.CertAuthenticator;
import com.couchbase.client.java.document.AbstractDocument;
import com.couchbase.client.java.document.RawJsonDocument;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.transcoder.Transcoder;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.typesafe.config.Config;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.math3.util.Pair;
import org.apache.gobblin.couchbase.common.TupleDocument;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.AsyncDataWriter;
import org.apache.gobblin.writer.GenericWriteResponse;
import org.apache.gobblin.writer.GenericWriteResponseWrapper;
import org.apache.gobblin.writer.SyncDataWriter;
import org.apache.gobblin.writer.WriteCallback;
import org.apache.gobblin.writer.WriteResponse;
import org.apache.gobblin.writer.WriteResponseFuture;
import org.apache.gobblin.writer.WriteResponseMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;

/* loaded from: input_file:org/apache/gobblin/couchbase/writer/CouchbaseWriter.class */
public class CouchbaseWriter<D extends AbstractDocument> implements AsyncDataWriter<D>, SyncDataWriter<D> {
    private static final Logger log = LoggerFactory.getLogger(CouchbaseWriter.class);
    private final Cluster _cluster;
    private final Bucket _bucket;
    private final long _operationTimeout;
    private final int _documentTTL;
    private final TimeUnit _documentTTLTimeUnits;
    private final String _documentTTLOriginField;
    private final TimeUnit _documentTTLOriginUnits;
    private final TimeUnit _operationTimeunit;
    private final WriteResponseMapper<D> _defaultWriteResponseMapper;
    private final Transcoder<TupleDocument, Tuple2<ByteBuf, Integer>> _tupleDocumentTranscoder = new Transcoder<TupleDocument, Tuple2<ByteBuf, Integer>>() { // from class: org.apache.gobblin.couchbase.writer.CouchbaseWriter.1
        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public TupleDocument m3decode(String str, ByteBuf byteBuf, long j, int i, int i2, ResponseStatus responseStatus) {
            return newDocument(str, i, Tuple.create(byteBuf, Integer.valueOf(i2)), j);
        }

        public Tuple2<ByteBuf, Integer> encode(TupleDocument tupleDocument) {
            return (Tuple2) tupleDocument.content();
        }

        public TupleDocument newDocument(String str, int i, Tuple2<ByteBuf, Integer> tuple2, long j) {
            return new TupleDocument(str, i, tuple2, j);
        }

        public TupleDocument newDocument(String str, int i, Tuple2<ByteBuf, Integer> tuple2, long j, MutationToken mutationToken) {
            return new TupleDocument(str, i, tuple2, j);
        }

        public Class<TupleDocument> documentType() {
            return TupleDocument.class;
        }
    };

    public CouchbaseWriter(CouchbaseEnvironment couchbaseEnvironment, Config config) {
        List stringList = ConfigUtils.getStringList(config, CouchbaseWriterConfigurationKeys.BOOTSTRAP_SERVERS);
        boolean z = ConfigUtils.getBoolean(config, CouchbaseWriterConfigurationKeys.CERT_AUTH_ENABLED, false);
        String string = ConfigUtils.getString(config, CouchbaseWriterConfigurationKeys.PASSWORD, "");
        log.info("Using hosts hosts: {}", stringList.stream().collect(Collectors.joining(",")));
        this._documentTTL = ConfigUtils.getInt(config, CouchbaseWriterConfigurationKeys.DOCUMENT_TTL, 0).intValue();
        this._documentTTLTimeUnits = ConfigUtils.getTimeUnit(config, CouchbaseWriterConfigurationKeys.DOCUMENT_TTL_UNIT, CouchbaseWriterConfigurationKeys.DOCUMENT_TTL_UNIT_DEFAULT);
        this._documentTTLOriginField = ConfigUtils.getString(config, CouchbaseWriterConfigurationKeys.DOCUMENT_TTL_ORIGIN_FIELD, (String) null);
        this._documentTTLOriginUnits = ConfigUtils.getTimeUnit(config, CouchbaseWriterConfigurationKeys.DOCUMENT_TTL_ORIGIN_FIELD_UNITS, CouchbaseWriterConfigurationKeys.DOCUMENT_TTL_ORIGIN_FIELD_UNITS_DEFAULT);
        String string2 = ConfigUtils.getString(config, CouchbaseWriterConfigurationKeys.BUCKET, CouchbaseWriterConfigurationKeys.BUCKET_DEFAULT);
        this._cluster = CouchbaseCluster.create(couchbaseEnvironment, stringList);
        if (z) {
            this._cluster.authenticate(CertAuthenticator.INSTANCE);
            this._bucket = this._cluster.openBucket(string2, Collections.singletonList(this._tupleDocumentTranscoder));
        } else if (string.isEmpty()) {
            this._bucket = this._cluster.openBucket(string2, Collections.singletonList(this._tupleDocumentTranscoder));
        } else {
            this._bucket = this._cluster.openBucket(string2, string, Collections.singletonList(this._tupleDocumentTranscoder));
        }
        this._operationTimeout = ConfigUtils.getLong(config, CouchbaseWriterConfigurationKeys.OPERATION_TIMEOUT_MILLIS, Long.valueOf(CouchbaseWriterConfigurationKeys.OPERATION_TIMEOUT_DEFAULT)).longValue();
        this._operationTimeunit = TimeUnit.MILLISECONDS;
        this._defaultWriteResponseMapper = new GenericWriteResponseWrapper();
        log.info("Couchbase writer configured with: hosts: {}, bucketName: {}, operationTimeoutInMillis: {}", new Object[]{stringList, string2, Long.valueOf(this._operationTimeout)});
    }

    @VisibleForTesting
    Bucket getBucket() {
        return this._bucket;
    }

    private void assertRecordWritable(D d) {
        Preconditions.checkArgument((d instanceof TupleDocument) || (d instanceof RawJsonDocument), "This writer only supports TupleDocument or RawJsonDocument. Found " + d.getClass().getName());
    }

    public Future<WriteResponse> write(D d, final WriteCallback writeCallback) {
        assertRecordWritable(d);
        if (d instanceof TupleDocument) {
            ((ByteBuf) ((Tuple2) ((TupleDocument) d).content()).value1()).retain();
        }
        try {
            Observable upsert = this._bucket.async().upsert(setDocumentTTL(d));
            if (writeCallback == null) {
                return new WriteResponseFuture(upsert.timeout(this._operationTimeout, this._operationTimeunit).toBlocking().toFuture(), this._defaultWriteResponseMapper);
            }
            final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            final ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
            Future<WriteResponse> future = new Future<WriteResponse>() { // from class: org.apache.gobblin.couchbase.writer.CouchbaseWriter.2
                @Override // java.util.concurrent.Future
                public boolean cancel(boolean z) {
                    return false;
                }

                @Override // java.util.concurrent.Future
                public boolean isCancelled() {
                    return false;
                }

                @Override // java.util.concurrent.Future
                public boolean isDone() {
                    return atomicBoolean.get();
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Future
                public WriteResponse get() throws InterruptedException, ExecutionException {
                    return CouchbaseWriter.this.getWriteResponseOrThrow((Pair) arrayBlockingQueue.take());
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Future
                public WriteResponse get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
                    Pair pair = (Pair) arrayBlockingQueue.poll(j, timeUnit);
                    if (pair == null) {
                        throw new TimeoutException("Timeout exceeded while waiting for future to be done");
                    }
                    return CouchbaseWriter.this.getWriteResponseOrThrow(pair);
                }
            };
            upsert.timeout(this._operationTimeout, this._operationTimeunit).subscribe(new Subscriber<D>() { // from class: org.apache.gobblin.couchbase.writer.CouchbaseWriter.3
                public void onCompleted() {
                }

                public void onError(Throwable th) {
                    atomicBoolean.set(true);
                    arrayBlockingQueue.add(new Pair((Object) null, th));
                    writeCallback.onFailure(th);
                }

                public void onNext(D d2) {
                    try {
                        atomicBoolean.set(true);
                        GenericWriteResponse genericWriteResponse = new GenericWriteResponse(d2);
                        arrayBlockingQueue.add(new Pair(genericWriteResponse, (Object) null));
                        writeCallback.onSuccess(genericWriteResponse);
                    } finally {
                        if (d2 instanceof TupleDocument) {
                            ((ByteBuf) ((Tuple2) ((TupleDocument) d2).content()).value1()).release();
                        }
                    }
                }
            });
            return future;
        } catch (DataRecordException e) {
            throw new RuntimeException("Caught exception trying to set TTL of the document", e);
        }
    }

    public void flush() throws IOException {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public WriteResponse getWriteResponseOrThrow(Pair<WriteResponse, Throwable> pair) throws ExecutionException {
        if (pair.getFirst() != null) {
            return (WriteResponse) pair.getFirst();
        }
        if (pair.getSecond() != null) {
            throw new ExecutionException((Throwable) pair.getSecond());
        }
        throw new ExecutionException(new RuntimeException("Could not find non-null WriteResponse pair"));
    }

    public void cleanup() throws IOException {
    }

    private D setDocumentTTL(D d) throws DataRecordException {
        long currentTimeMillis;
        boolean z = d instanceof TupleDocument;
        boolean z2 = d instanceof RawJsonDocument;
        long convert = TimeUnit.SECONDS.convert(this._documentTTL, this._documentTTLTimeUnits);
        if (this._documentTTL == 0) {
            return d;
        }
        if (this._documentTTLOriginField == null || this._documentTTLOriginField.isEmpty()) {
            currentTimeMillis = System.currentTimeMillis() / 1000;
        } else {
            JsonElement parse = new JsonParser().parse(z ? new String(((ByteBuf) ((Tuple2) d.content()).value1()).array(), StandardCharsets.UTF_8) : (String) d.content());
            if (!parse.isJsonObject()) {
                throw new DataRecordException(String.format("Document TTL Field is set but the record's value is not a valid json object.: '%s'", parse.toString()));
            }
            currentTimeMillis = TimeUnit.SECONDS.convert(parse.getAsJsonObject().get(this._documentTTLOriginField).getAsLong(), this._documentTTLOriginUnits);
        }
        try {
            int intExact = Math.toIntExact(convert + currentTimeMillis);
            if (z) {
                return this._tupleDocumentTranscoder.newDocument(d.id(), intExact, (Tuple2) d.content(), d.cas(), d.mutationToken());
            }
            if (z2) {
                return RawJsonDocument.create(d.id(), intExact, (String) d.content(), d.cas(), d.mutationToken());
            }
            throw new RuntimeException(" Only TupleDocument and RawJsonDocument documents are supported");
        } catch (ArithmeticException e) {
            throw new RuntimeException("There was an overflow calculating the expiry timestamp. couchbase currently only supports expiry until January 19, 2038 03:14:07 GMT", e);
        }
    }

    public WriteResponse write(D d) throws IOException {
        try {
            AbstractDocument upsert = this._bucket.upsert(setDocumentTTL(d));
            Preconditions.checkNotNull(upsert);
            return new GenericWriteResponse(upsert);
        } catch (Exception e) {
            throw new IOException("Failed to write to Couchbase cluster", e);
        }
    }

    public void close() {
        if (!this._bucket.isClosed()) {
            try {
                this._bucket.close();
            } catch (Exception e) {
                log.warn("Failed to close bucket", e);
            }
        }
        try {
            this._cluster.disconnect();
        } catch (Exception e2) {
            log.warn("Failed to disconnect from cluster", e2);
        }
    }
}
