package com.twitter.distributedlog.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.LogRecordSetBuffer;
import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
import com.twitter.distributedlog.client.ownership.OwnershipCache;
import com.twitter.distributedlog.client.proxy.HostProvider;
import com.twitter.distributedlog.client.proxy.ProxyClient;
import com.twitter.distributedlog.client.proxy.ProxyClientManager;
import com.twitter.distributedlog.client.proxy.ProxyListener;
import com.twitter.distributedlog.client.resolver.RegionResolver;
import com.twitter.distributedlog.client.routing.RoutingService;
import com.twitter.distributedlog.client.stats.ClientStats;
import com.twitter.distributedlog.client.stats.OpStats;
import com.twitter.distributedlog.exceptions.DLClientClosedException;
import com.twitter.distributedlog.exceptions.DLException;
import com.twitter.distributedlog.exceptions.ServiceUnavailableException;
import com.twitter.distributedlog.service.DLSocketAddress;
import com.twitter.distributedlog.service.DistributedLogClient;
import com.twitter.distributedlog.thrift.service.BulkWriteResponse;
import com.twitter.distributedlog.thrift.service.HeartbeatOptions;
import com.twitter.distributedlog.thrift.service.ResponseHeader;
import com.twitter.distributedlog.thrift.service.ServerInfo;
import com.twitter.distributedlog.thrift.service.ServerStatus;
import com.twitter.distributedlog.thrift.service.StatusCode;
import com.twitter.distributedlog.thrift.service.WriteContext;
import com.twitter.distributedlog.thrift.service.WriteResponse;
import com.twitter.distributedlog.util.ProtocolUtils;
import com.twitter.finagle.CancelledRequestException;
import com.twitter.finagle.ConnectionFailedException;
import com.twitter.finagle.Failure;
import com.twitter.finagle.NoBrokersAvailableException;
import com.twitter.finagle.RequestTimeoutException;
import com.twitter.finagle.ServiceException;
import com.twitter.finagle.ServiceTimeoutException;
import com.twitter.finagle.WriteException;
import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.stats.StatsReceiver;
import com.twitter.finagle.thrift.ClientId;
import com.twitter.util.Duration;
import com.twitter.util.Function;
import com.twitter.util.Function0;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
import org.apache.thrift.TApplicationException;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;
import scala.runtime.AbstractFunction1;

/* loaded from: input_file:com/twitter/distributedlog/client/DistributedLogClientImpl.class */
public class DistributedLogClientImpl implements DistributedLogClient, MonitorServiceClient, RoutingService.RoutingListener, ProxyListener, HostProvider {
    static final Logger logger = LoggerFactory.getLogger(DistributedLogClientImpl.class);
    private final String clientName;
    private final ClientId clientId;
    private final ClientConfig clientConfig;
    private final RoutingService routingService;
    private final ProxyClient.Builder clientBuilder;
    private final boolean streamFailfast;
    private final Pattern streamNameRegexPattern;
    private final HashedWheelTimer dlTimer;
    private final RegionResolver regionResolver;
    private final OwnershipCache ownershipCache;
    private final ProxyClientManager clientManager;
    private boolean closed = false;
    private final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
    private final ClientStats clientStats;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.twitter.distributedlog.client.DistributedLogClientImpl$5, reason: invalid class name */
    /* loaded from: input_file:com/twitter/distributedlog/client/DistributedLogClientImpl$5.class */
    public static /* synthetic */ class AnonymousClass5 {
        static final /* synthetic */ int[] $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode = new int[StatusCode.values().length];

        static {
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.SUCCESS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.FOUND.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.OVER_CAPACITY.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.NOT_IMPLEMENTED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.METADATA_EXCEPTION.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.LOG_EMPTY.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.LOG_NOT_FOUND.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.TRUNCATED_TRANSACTION.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.END_OF_STREAM.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.TRANSACTION_OUT_OF_ORDER.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.INVALID_STREAM_NAME.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.REQUEST_DENIED.ordinal()] = 12;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.TOO_LARGE_RECORD.ordinal()] = 13;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.CHECKSUM_FAILED.ordinal()] = 14;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.STREAM_NOT_READY.ordinal()] = 15;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.SERVICE_UNAVAILABLE.ordinal()] = 16;
            } catch (NoSuchFieldError e16) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.REGION_UNAVAILABLE.ordinal()] = 17;
            } catch (NoSuchFieldError e17) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.TOO_MANY_STREAMS.ordinal()] = 18;
            } catch (NoSuchFieldError e18) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.STREAM_UNAVAILABLE.ordinal()] = 19;
            } catch (NoSuchFieldError e19) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.ZOOKEEPER_ERROR.ordinal()] = 20;
            } catch (NoSuchFieldError e20) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.LOCKING_EXCEPTION.ordinal()] = 21;
            } catch (NoSuchFieldError e21) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.UNEXPECTED.ordinal()] = 22;
            } catch (NoSuchFieldError e22) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.INTERRUPTED.ordinal()] = 23;
            } catch (NoSuchFieldError e23) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.BK_TRANSMIT_ERROR.ordinal()] = 24;
            } catch (NoSuchFieldError e24) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.FLUSH_TIMEOUT.ordinal()] = 25;
            } catch (NoSuchFieldError e25) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/twitter/distributedlog/client/DistributedLogClientImpl$AbstractWriteOp.class */
    public abstract class AbstractWriteOp extends StreamOp {
        final Promise<WriteResponse> result;
        Long crc32;

        AbstractWriteOp(String str, OpStats opStats) {
            super(str, opStats);
            this.result = new Promise<>();
            this.crc32 = null;
        }

        void complete(SocketAddress socketAddress, WriteResponse writeResponse) {
            super.complete(socketAddress);
            this.result.setValue(writeResponse);
        }

        @Override // com.twitter.distributedlog.client.DistributedLogClientImpl.StreamOp
        void fail(SocketAddress socketAddress, Throwable th) {
            super.fail(socketAddress, th);
            this.result.setException(th);
        }

        @Override // com.twitter.distributedlog.client.DistributedLogClientImpl.StreamOp
        Long computeChecksum() {
            if (null == this.crc32) {
                this.crc32 = ProtocolUtils.streamOpCRC32(this.stream);
            }
            return this.crc32;
        }

        @Override // com.twitter.distributedlog.client.DistributedLogClientImpl.StreamOp
        Future<ResponseHeader> sendRequest(final ProxyClient proxyClient) {
            return sendWriteRequest(proxyClient).addEventListener(new FutureEventListener<WriteResponse>() { // from class: com.twitter.distributedlog.client.DistributedLogClientImpl.AbstractWriteOp.2
                public void onSuccess(WriteResponse writeResponse) {
                    if (writeResponse.getHeader().getCode() == StatusCode.SUCCESS) {
                        AbstractWriteOp.this.beforeComplete(proxyClient, writeResponse.getHeader());
                        AbstractWriteOp.this.complete(proxyClient.getAddress(), writeResponse);
                    }
                }

                public void onFailure(Throwable th) {
                }
            }).map(new AbstractFunction1<WriteResponse, ResponseHeader>() { // from class: com.twitter.distributedlog.client.DistributedLogClientImpl.AbstractWriteOp.1
                public ResponseHeader apply(WriteResponse writeResponse) {
                    return writeResponse.getHeader();
                }
            });
        }

        abstract Future<WriteResponse> sendWriteRequest(ProxyClient proxyClient);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/twitter/distributedlog/client/DistributedLogClientImpl$BulkWriteOp.class */
    public class BulkWriteOp extends StreamOp {
        final List<ByteBuffer> data;
        final ArrayList<Promise<DLSN>> results;

        BulkWriteOp(String str, List<ByteBuffer> list) {
            super(str, DistributedLogClientImpl.this.clientStats.getOpStats("bulk_write"));
            this.data = list;
            this.results = new ArrayList<>(list.size());
            for (int i = 0; i < list.size(); i++) {
                this.results.add(new Promise<>());
            }
        }

        @Override // com.twitter.distributedlog.client.DistributedLogClientImpl.StreamOp
        Future<ResponseHeader> sendRequest(final ProxyClient proxyClient) {
            return proxyClient.getService().writeBulkWithContext(this.stream, this.data, this.ctx).addEventListener(new FutureEventListener<BulkWriteResponse>() { // from class: com.twitter.distributedlog.client.DistributedLogClientImpl.BulkWriteOp.2
                public void onSuccess(BulkWriteResponse bulkWriteResponse) {
                    if (bulkWriteResponse.getHeader().getCode() == StatusCode.SUCCESS) {
                        BulkWriteOp.this.beforeComplete(proxyClient, bulkWriteResponse.getHeader());
                        BulkWriteOp.this.complete(proxyClient.getAddress(), bulkWriteResponse);
                        if (bulkWriteResponse.getWriteResponses().size() != 0 || BulkWriteOp.this.data.size() <= 0) {
                            return;
                        }
                        DistributedLogClientImpl.logger.error("non-empty bulk write got back empty response without failure for stream {}", BulkWriteOp.this.stream);
                    }
                }

                public void onFailure(Throwable th) {
                }
            }).map(new AbstractFunction1<BulkWriteResponse, ResponseHeader>() { // from class: com.twitter.distributedlog.client.DistributedLogClientImpl.BulkWriteOp.1
                public ResponseHeader apply(BulkWriteResponse bulkWriteResponse) {
                    return bulkWriteResponse.getHeader();
                }
            });
        }

        void complete(SocketAddress socketAddress, BulkWriteResponse bulkWriteResponse) {
            super.complete(socketAddress);
            Iterator it = bulkWriteResponse.getWriteResponses().iterator();
            Iterator<Promise<DLSN>> it2 = this.results.iterator();
            while (it2.hasNext() && it.hasNext()) {
                Promise<DLSN> next = it2.next();
                WriteResponse writeResponse = (WriteResponse) it.next();
                if (StatusCode.SUCCESS == writeResponse.getHeader().getCode()) {
                    next.setValue(DLSN.deserialize(writeResponse.getDlsn()));
                } else {
                    next.setException(DLException.of(writeResponse.getHeader()));
                }
            }
            if (bulkWriteResponse.getWriteResponses().size() != this.data.size()) {
                DistributedLogClientImpl.logger.error("wrong number of results, response = {} records = ", Integer.valueOf(bulkWriteResponse.getWriteResponses().size()), Integer.valueOf(this.data.size()));
            }
        }

        @Override // com.twitter.distributedlog.client.DistributedLogClientImpl.StreamOp
        void fail(SocketAddress socketAddress, Throwable th) {
            super.fail(socketAddress, th);
            Iterator<Promise<DLSN>> it = this.results.iterator();
            if (it.hasNext()) {
                it.next().setException(th);
            }
            while (it.hasNext()) {
                it.next().setException(new CancelledRequestException());
            }
        }

        List<Future<DLSN>> result() {
            return this.results;
        }
    }

    /* loaded from: input_file:com/twitter/distributedlog/client/DistributedLogClientImpl$CreateOp.class */
    class CreateOp extends AbstractWriteOp {
        CreateOp(String str) {
            super(str, DistributedLogClientImpl.this.clientStats.getOpStats("create"));
        }

        @Override // com.twitter.distributedlog.client.DistributedLogClientImpl.AbstractWriteOp
        Future<WriteResponse> sendWriteRequest(ProxyClient proxyClient) {
            return proxyClient.getService().create(this.stream, this.ctx);
        }

        @Override // com.twitter.distributedlog.client.DistributedLogClientImpl.StreamOp
        void beforeComplete(ProxyClient proxyClient, ResponseHeader responseHeader) {
            DistributedLogClientImpl.this.ownershipCache.updateOwner(this.stream, proxyClient.getAddress());
        }

        Future<Void> result() {
            return this.result.map(new AbstractFunction1<WriteResponse, Void>() { // from class: com.twitter.distributedlog.client.DistributedLogClientImpl.CreateOp.1
                public Void apply(WriteResponse writeResponse) {
                    return null;
                }
            }).voided();
        }
    }

    /* loaded from: input_file:com/twitter/distributedlog/client/DistributedLogClientImpl$DeleteOp.class */
    class DeleteOp extends AbstractWriteOp {
        DeleteOp(String str) {
            super(str, DistributedLogClientImpl.this.clientStats.getOpStats("delete"));
        }

        @Override // com.twitter.distributedlog.client.DistributedLogClientImpl.AbstractWriteOp
        Future<WriteResponse> sendWriteRequest(ProxyClient proxyClient) {
            return proxyClient.getService().delete(this.stream, this.ctx);
        }

        @Override // com.twitter.distributedlog.client.DistributedLogClientImpl.StreamOp
        void beforeComplete(ProxyClient proxyClient, ResponseHeader responseHeader) {
            DistributedLogClientImpl.this.ownershipCache.removeOwnerFromStream(this.stream, proxyClient.getAddress(), "Stream Deleted");
        }

        Future<Void> result() {
            return this.result.map(new AbstractFunction1<WriteResponse, Void>() { // from class: com.twitter.distributedlog.client.DistributedLogClientImpl.DeleteOp.1
                public Void apply(WriteResponse writeResponse) {
                    return null;
                }
            });
        }
    }

    /* loaded from: input_file:com/twitter/distributedlog/client/DistributedLogClientImpl$HeartbeatOp.class */
    class HeartbeatOp extends AbstractWriteOp {
        HeartbeatOptions options;

        HeartbeatOp(String str, boolean z) {
            super(str, DistributedLogClientImpl.this.clientStats.getOpStats("heartbeat"));
            this.options = new HeartbeatOptions();
            this.options.setSendHeartBeatToReader(z);
        }

        @Override // com.twitter.distributedlog.client.DistributedLogClientImpl.AbstractWriteOp
        Future<WriteResponse> sendWriteRequest(ProxyClient proxyClient) {
            return proxyClient.getService().heartbeatWithOptions(this.stream, this.ctx, this.options);
        }

        Future<Void> result() {
            return this.result.map(new AbstractFunction1<WriteResponse, Void>() { // from class: com.twitter.distributedlog.client.DistributedLogClientImpl.HeartbeatOp.1
                public Void apply(WriteResponse writeResponse) {
                    return null;
                }
            });
        }
    }

    /* loaded from: input_file:com/twitter/distributedlog/client/DistributedLogClientImpl$ReleaseOp.class */
    class ReleaseOp extends AbstractWriteOp {
        ReleaseOp(String str) {
            super(str, DistributedLogClientImpl.this.clientStats.getOpStats("release"));
        }

        @Override // com.twitter.distributedlog.client.DistributedLogClientImpl.AbstractWriteOp
        Future<WriteResponse> sendWriteRequest(ProxyClient proxyClient) {
            return proxyClient.getService().release(this.stream, this.ctx);
        }

        @Override // com.twitter.distributedlog.client.DistributedLogClientImpl.StreamOp
        void beforeComplete(ProxyClient proxyClient, ResponseHeader responseHeader) {
            DistributedLogClientImpl.this.ownershipCache.removeOwnerFromStream(this.stream, proxyClient.getAddress(), "Stream Deleted");
        }

        Future<Void> result() {
            return this.result.map(new AbstractFunction1<WriteResponse, Void>() { // from class: com.twitter.distributedlog.client.DistributedLogClientImpl.ReleaseOp.1
                public Void apply(WriteResponse writeResponse) {
                    return null;
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/twitter/distributedlog/client/DistributedLogClientImpl$StreamOp.class */
    public abstract class StreamOp implements TimerTask {
        final String stream;
        final RoutingService.RoutingContext routingContext;
        final OpStats opStats;
        SocketAddress nextAddressToSend;
        final AtomicInteger tries = new AtomicInteger(0);
        final WriteContext ctx = new WriteContext();
        final Stopwatch stopwatch = Stopwatch.createStarted();

        StreamOp(String str, OpStats opStats) {
            this.routingContext = RoutingService.RoutingContext.of(DistributedLogClientImpl.this.regionResolver);
            this.stream = str;
            this.opStats = opStats;
        }

        void send(SocketAddress socketAddress) {
            long elapsed = this.stopwatch.elapsed(TimeUnit.MILLISECONDS);
            if (DistributedLogClientImpl.this.clientConfig.getMaxRedirects() > 0 && this.tries.get() >= DistributedLogClientImpl.this.clientConfig.getMaxRedirects()) {
                fail(socketAddress, new RequestTimeoutException(Duration.fromMilliseconds(elapsed), "Exhausted max redirects in " + elapsed + " ms"));
                return;
            }
            if (DistributedLogClientImpl.this.clientConfig.getRequestTimeoutMs() > 0 && elapsed >= DistributedLogClientImpl.this.clientConfig.getRequestTimeoutMs()) {
                fail(socketAddress, new RequestTimeoutException(Duration.fromMilliseconds(elapsed), "Exhausted max request timeout " + DistributedLogClientImpl.this.clientConfig.getRequestTimeoutMs() + " in " + elapsed + " ms"));
                return;
            }
            synchronized (this) {
                String obj = socketAddress.toString();
                if (this.ctx.isSetTriedHosts() && this.ctx.getTriedHosts().contains(obj)) {
                    this.nextAddressToSend = socketAddress;
                    DistributedLogClientImpl.this.dlTimer.newTimeout(this, Math.min(DistributedLogClientImpl.this.clientConfig.getRedirectBackoffMaxMs(), this.tries.get() * DistributedLogClientImpl.this.clientConfig.getRedirectBackoffStartMs()), TimeUnit.MILLISECONDS);
                } else {
                    doSend(socketAddress);
                }
            }
        }

        abstract Future<ResponseHeader> sendRequest(ProxyClient proxyClient);

        void doSend(SocketAddress socketAddress) {
            Long computeChecksum;
            this.ctx.addToTriedHosts(socketAddress.toString());
            if (DistributedLogClientImpl.this.clientConfig.isChecksumEnabled() && null != (computeChecksum = computeChecksum())) {
                this.ctx.setCrc32(computeChecksum.longValue());
            }
            this.tries.incrementAndGet();
            DistributedLogClientImpl.this.sendWriteRequest(socketAddress, this);
        }

        void beforeComplete(ProxyClient proxyClient, ResponseHeader responseHeader) {
            DistributedLogClientImpl.this.ownershipCache.updateOwner(this.stream, proxyClient.getAddress());
        }

        void complete(SocketAddress socketAddress) {
            this.stopwatch.stop();
            this.opStats.completeRequest(socketAddress, this.stopwatch.elapsed(TimeUnit.MICROSECONDS), this.tries.get());
        }

        void fail(SocketAddress socketAddress, Throwable th) {
            this.stopwatch.stop();
            this.opStats.failRequest(socketAddress, this.stopwatch.elapsed(TimeUnit.MICROSECONDS), this.tries.get());
        }

        Long computeChecksum() {
            return null;
        }

        public synchronized void run(Timeout timeout) throws Exception {
            if (timeout.isCancelled() || null == this.nextAddressToSend) {
                fail(null, new CancelledRequestException());
            } else {
                doSend(this.nextAddressToSend);
            }
        }
    }

    /* loaded from: input_file:com/twitter/distributedlog/client/DistributedLogClientImpl$TruncateOp.class */
    class TruncateOp extends AbstractWriteOp {
        final DLSN dlsn;

        TruncateOp(String str, DLSN dlsn) {
            super(str, DistributedLogClientImpl.this.clientStats.getOpStats("truncate"));
            this.dlsn = dlsn;
        }

        @Override // com.twitter.distributedlog.client.DistributedLogClientImpl.AbstractWriteOp, com.twitter.distributedlog.client.DistributedLogClientImpl.StreamOp
        Long computeChecksum() {
            if (null == this.crc32) {
                this.crc32 = ProtocolUtils.truncateOpCRC32(this.stream, this.dlsn);
            }
            return this.crc32;
        }

        @Override // com.twitter.distributedlog.client.DistributedLogClientImpl.AbstractWriteOp
        Future<WriteResponse> sendWriteRequest(ProxyClient proxyClient) {
            return proxyClient.getService().truncate(this.stream, this.dlsn.serialize(), this.ctx);
        }

        Future<Boolean> result() {
            return this.result.map(new AbstractFunction1<WriteResponse, Boolean>() { // from class: com.twitter.distributedlog.client.DistributedLogClientImpl.TruncateOp.1
                public Boolean apply(WriteResponse writeResponse) {
                    return true;
                }
            });
        }
    }

    /* loaded from: input_file:com/twitter/distributedlog/client/DistributedLogClientImpl$WriteOp.class */
    class WriteOp extends AbstractWriteOp {
        final ByteBuffer data;

        WriteOp(String str, ByteBuffer byteBuffer) {
            super(str, DistributedLogClientImpl.this.clientStats.getOpStats("write"));
            this.data = byteBuffer;
        }

        @Override // com.twitter.distributedlog.client.DistributedLogClientImpl.AbstractWriteOp
        Future<WriteResponse> sendWriteRequest(ProxyClient proxyClient) {
            return proxyClient.getService().writeWithContext(this.stream, this.data, this.ctx);
        }

        @Override // com.twitter.distributedlog.client.DistributedLogClientImpl.AbstractWriteOp, com.twitter.distributedlog.client.DistributedLogClientImpl.StreamOp
        Long computeChecksum() {
            if (null == this.crc32) {
                byte[] bArr = new byte[this.data.remaining()];
                this.data.duplicate().get(bArr);
                this.crc32 = ProtocolUtils.writeOpCRC32(this.stream, bArr);
            }
            return this.crc32;
        }

        Future<DLSN> result() {
            return this.result.map(new AbstractFunction1<WriteResponse, DLSN>() { // from class: com.twitter.distributedlog.client.DistributedLogClientImpl.WriteOp.1
                public DLSN apply(WriteResponse writeResponse) {
                    return DLSN.deserialize(writeResponse.getDlsn());
                }
            });
        }
    }

    /* loaded from: input_file:com/twitter/distributedlog/client/DistributedLogClientImpl$WriteRecordSetOp.class */
    class WriteRecordSetOp extends WriteOp {
        WriteRecordSetOp(String str, LogRecordSetBuffer logRecordSetBuffer) {
            super(str, logRecordSetBuffer.getBuffer());
            this.ctx.setIsRecordSet(true);
        }
    }

    public DistributedLogClientImpl(String str, ClientId clientId, RoutingService routingService, ClientBuilder clientBuilder, ClientConfig clientConfig, StatsReceiver statsReceiver, StatsReceiver statsReceiver2, RegionResolver regionResolver, boolean z) {
        this.clientName = str;
        this.clientId = clientId;
        this.routingService = routingService;
        this.clientConfig = clientConfig;
        this.streamFailfast = clientConfig.getStreamFailfast();
        this.streamNameRegexPattern = Pattern.compile(clientConfig.getStreamNameRegex());
        this.regionResolver = regionResolver;
        this.dlTimer = new HashedWheelTimer(new ThreadFactoryBuilder().setNameFormat("DLClient-" + str + "-timer-%d").build(), this.clientConfig.getRedirectBackoffStartMs(), TimeUnit.MILLISECONDS);
        this.routingService.registerListener(this);
        this.ownershipCache = new OwnershipCache(this.clientConfig, this.dlTimer, statsReceiver, statsReceiver2);
        this.clientStats = new ClientStats(statsReceiver, z, regionResolver);
        this.clientBuilder = ProxyClient.newBuilder(this.clientName, clientId, clientBuilder, clientConfig, this.clientStats);
        this.clientManager = new ProxyClientManager(this.clientConfig, this.clientBuilder, this.dlTimer, this, this.clientStats);
        this.clientManager.registerProxyListener(this);
        StatsReceiver scope = statsReceiver.scope("cache");
        scope.provideGauge(JavaConversions.asScalaBuffer(Arrays.asList("num_streams")).toList(), new Function0<Object>() { // from class: com.twitter.distributedlog.client.DistributedLogClientImpl.1
            public Object apply() {
                return Float.valueOf(DistributedLogClientImpl.this.ownershipCache.getNumCachedStreams());
            }
        });
        scope.provideGauge(JavaConversions.asScalaBuffer(Arrays.asList("num_hosts")).toList(), new Function0<Object>() { // from class: com.twitter.distributedlog.client.DistributedLogClientImpl.2
            public Object apply() {
                return Float.valueOf(DistributedLogClientImpl.this.clientManager.getNumProxies());
            }
        });
        logger.info("Build distributedlog client : name = {}, client_id = {}, routing_service = {}, stats_receiver = {}, thriftmux = {}", new Object[]{str, clientId, routingService.getClass(), statsReceiver.getClass(), Boolean.valueOf(clientConfig.getThriftMux())});
    }

    @Override // com.twitter.distributedlog.client.proxy.HostProvider
    public Set<SocketAddress> getHosts() {
        HashSet newHashSet = Sets.newHashSet();
        newHashSet.addAll(this.routingService.getHosts());
        newHashSet.addAll(this.ownershipCache.getStreamOwnershipDistribution().keySet());
        return newHashSet;
    }

    @Override // com.twitter.distributedlog.client.proxy.ProxyListener
    public void onHandshakeSuccess(SocketAddress socketAddress, ProxyClient proxyClient, ServerInfo serverInfo) {
        if (null != serverInfo && serverInfo.isSetServerStatus() && ServerStatus.DOWN == serverInfo.getServerStatus()) {
            logger.info("{} is detected as DOWN during handshaking", socketAddress);
            handleServiceUnavailable(socketAddress, proxyClient, Optional.absent());
            return;
        }
        if (null == serverInfo || !serverInfo.isSetOwnerships()) {
            logger.debug("Handshaked with {} : no ownerships returned", socketAddress);
            return;
        }
        Map ownerships = serverInfo.getOwnerships();
        logger.debug("Handshaked with {} : {} ownerships returned.", socketAddress, Integer.valueOf(ownerships.size()));
        for (Map.Entry entry : ownerships.entrySet()) {
            if (this.streamNameRegexPattern.matcher((CharSequence) entry.getKey()).matches()) {
                updateOwnership((String) entry.getKey(), (String) entry.getValue());
            }
        }
    }

    @Override // com.twitter.distributedlog.client.proxy.ProxyListener
    public void onHandshakeFailure(SocketAddress socketAddress, ProxyClient proxyClient, Throwable th) {
        handleRequestException(socketAddress, proxyClient, Optional.absent(), showRootCause(Optional.absent(), th));
    }

    @VisibleForTesting
    public void handshake() {
        this.clientManager.handshake();
        logger.info("Handshaked with {} hosts, cached {} streams", Integer.valueOf(this.clientManager.getNumProxies()), Integer.valueOf(this.ownershipCache.getNumCachedStreams()));
    }

    @Override // com.twitter.distributedlog.client.routing.RoutingService.RoutingListener
    public void onServerLeft(SocketAddress socketAddress) {
        onServerLeft(socketAddress, null);
    }

    private void onServerLeft(SocketAddress socketAddress, ProxyClient proxyClient) {
        this.ownershipCache.removeAllStreamsFromOwner(socketAddress);
        if (null == proxyClient) {
            this.clientManager.removeClient(socketAddress);
        } else {
            this.clientManager.removeClient(socketAddress, proxyClient);
        }
    }

    @Override // com.twitter.distributedlog.client.routing.RoutingService.RoutingListener
    public void onServerJoin(SocketAddress socketAddress) {
        this.clientManager.createClient(socketAddress);
    }

    @Override // com.twitter.distributedlog.service.DistributedLogClient, com.twitter.distributedlog.client.monitor.MonitorServiceClient
    public void close() {
        this.closeLock.writeLock().lock();
        try {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.closeLock.writeLock().unlock();
            this.clientManager.close();
            this.routingService.unregisterListener(this);
            this.routingService.stopService();
            this.dlTimer.stop();
        } finally {
            this.closeLock.writeLock().unlock();
        }
    }

    @Override // com.twitter.distributedlog.client.monitor.MonitorServiceClient
    public Future<Void> check(String str) {
        HeartbeatOp heartbeatOp = new HeartbeatOp(str, false);
        sendRequest(heartbeatOp);
        return heartbeatOp.result();
    }

    @Override // com.twitter.distributedlog.client.monitor.MonitorServiceClient
    public Future<Void> heartbeat(String str) {
        HeartbeatOp heartbeatOp = new HeartbeatOp(str, true);
        sendRequest(heartbeatOp);
        return heartbeatOp.result();
    }

    @Override // com.twitter.distributedlog.client.monitor.MonitorServiceClient
    public Map<SocketAddress, Set<String>> getStreamOwnershipDistribution() {
        return this.ownershipCache.getStreamOwnershipDistribution();
    }

    @Override // com.twitter.distributedlog.client.monitor.MonitorServiceClient
    public Future<Void> setAcceptNewStream(boolean z) {
        Map<SocketAddress, ProxyClient> allClients = this.clientManager.getAllClients();
        ArrayList arrayList = new ArrayList(allClients.size());
        Iterator<Map.Entry<SocketAddress, ProxyClient>> it = allClients.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getValue().getService().setAcceptNewStream(z));
        }
        return Future.collect(arrayList).map(new Function<List<Void>, Void>() { // from class: com.twitter.distributedlog.client.DistributedLogClientImpl.3
            public Void apply(List<Void> list) {
                return null;
            }
        });
    }

    @Override // com.twitter.distributedlog.service.DistributedLogClient
    public Future<DLSN> write(String str, ByteBuffer byteBuffer) {
        WriteOp writeOp = new WriteOp(str, byteBuffer);
        sendRequest(writeOp);
        return writeOp.result();
    }

    @Override // com.twitter.distributedlog.service.DistributedLogClient
    public Future<DLSN> writeRecordSet(String str, LogRecordSetBuffer logRecordSetBuffer) {
        WriteRecordSetOp writeRecordSetOp = new WriteRecordSetOp(str, logRecordSetBuffer);
        sendRequest(writeRecordSetOp);
        return writeRecordSetOp.result();
    }

    @Override // com.twitter.distributedlog.service.DistributedLogClient
    public List<Future<DLSN>> writeBulk(String str, List<ByteBuffer> list) {
        if (list.size() <= 0) {
            return Collections.emptyList();
        }
        BulkWriteOp bulkWriteOp = new BulkWriteOp(str, list);
        sendRequest(bulkWriteOp);
        return bulkWriteOp.result();
    }

    @Override // com.twitter.distributedlog.service.DistributedLogClient
    public Future<Boolean> truncate(String str, DLSN dlsn) {
        TruncateOp truncateOp = new TruncateOp(str, dlsn);
        sendRequest(truncateOp);
        return truncateOp.result();
    }

    @Override // com.twitter.distributedlog.service.DistributedLogClient
    public Future<Void> delete(String str) {
        DeleteOp deleteOp = new DeleteOp(str);
        sendRequest(deleteOp);
        return deleteOp.result();
    }

    @Override // com.twitter.distributedlog.service.DistributedLogClient
    public Future<Void> release(String str) {
        ReleaseOp releaseOp = new ReleaseOp(str);
        sendRequest(releaseOp);
        return releaseOp.result();
    }

    @Override // com.twitter.distributedlog.service.DistributedLogClient
    public Future<Void> create(String str) {
        CreateOp createOp = new CreateOp(str);
        sendRequest(createOp);
        return createOp.result();
    }

    private void sendRequest(StreamOp streamOp) {
        this.closeLock.readLock().lock();
        try {
            if (this.closed) {
                streamOp.fail(null, new DLClientClosedException("Client " + this.clientName + " is closed."));
            } else {
                doSend(streamOp, null);
            }
        } finally {
            this.closeLock.readLock().unlock();
        }
    }

    private void doSend(StreamOp streamOp, SocketAddress socketAddress) {
        if (null != socketAddress) {
            streamOp.routingContext.addTriedHost(socketAddress, StatusCode.WRITE_EXCEPTION);
        }
        SocketAddress owner = this.ownershipCache.getOwner(streamOp.stream);
        if (null == owner || streamOp.routingContext.isTriedHost(owner)) {
            try {
                owner = this.routingService.getHost(streamOp.stream, streamOp.routingContext);
            } catch (NoBrokersAvailableException e) {
                streamOp.fail(null, e);
                return;
            }
        }
        streamOp.send(owner);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendWriteRequest(final SocketAddress socketAddress, final StreamOp streamOp) {
        final ProxyClient client = this.clientManager.getClient(socketAddress);
        final long nanoTime = System.nanoTime();
        streamOp.sendRequest(client).addEventListener(new FutureEventListener<ResponseHeader>() { // from class: com.twitter.distributedlog.client.DistributedLogClientImpl.4
            public void onSuccess(ResponseHeader responseHeader) {
                if (DistributedLogClientImpl.logger.isDebugEnabled()) {
                    DistributedLogClientImpl.logger.debug("Received response; header: {}", responseHeader);
                }
                DistributedLogClientImpl.this.clientStats.completeProxyRequest(socketAddress, responseHeader.getCode(), nanoTime);
                streamOp.routingContext.addTriedHost(socketAddress, responseHeader.getCode());
                switch (AnonymousClass5.$SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[responseHeader.getCode().ordinal()]) {
                    case 1:
                        return;
                    case 2:
                        DistributedLogClientImpl.this.handleRedirectResponse(responseHeader, streamOp, socketAddress);
                        return;
                    case 3:
                        DistributedLogClientImpl.logger.debug("Failed to write request to {} : {}", streamOp.stream, responseHeader);
                        streamOp.fail(socketAddress, DLException.of(responseHeader));
                        return;
                    case 4:
                    case 5:
                    case 6:
                    case 7:
                    case 8:
                    case 9:
                    case 10:
                    case 11:
                    case 12:
                    case 13:
                    case 14:
                    case 15:
                        streamOp.fail(socketAddress, DLException.of(responseHeader));
                        return;
                    case 16:
                        DistributedLogClientImpl.this.handleServiceUnavailable(socketAddress, client, Optional.of(streamOp));
                        return;
                    case 17:
                        DistributedLogClientImpl.this.redirect(streamOp, null);
                        return;
                    case 18:
                        DistributedLogClientImpl.this.handleRedirectableError(socketAddress, streamOp, responseHeader);
                        return;
                    case 19:
                    case 20:
                    case 21:
                    case 22:
                    case 23:
                    case 24:
                    case 25:
                    default:
                        DistributedLogClientImpl.this.ownershipCache.removeOwnerFromStream(streamOp.stream, socketAddress, responseHeader.getCode().name());
                        DistributedLogClientImpl.this.handleRedirectableError(socketAddress, streamOp, responseHeader);
                        return;
                }
            }

            public void onFailure(Throwable th) {
                Optional<StreamOp> of = Optional.of(streamOp);
                Throwable showRootCause = DistributedLogClientImpl.this.showRootCause(of, th);
                DistributedLogClientImpl.this.clientStats.failProxyRequest(socketAddress, showRootCause, nanoTime);
                DistributedLogClientImpl.this.handleRequestException(socketAddress, client, of, showRootCause);
            }
        });
    }

    Throwable showRootCause(Optional<StreamOp> optional, Throwable th) {
        if (th instanceof Failure) {
            Failure failure = (Failure) th;
            if (failure.isFlagged(Failure.Wrapped())) {
                try {
                    th = failure.show();
                } catch (IllegalArgumentException e) {
                    if (optional.isPresent()) {
                        logger.warn("Failed to unwrap finagle failure of stream {} : ", ((StreamOp) optional.get()).stream, e);
                    } else {
                        logger.warn("Failed to unwrap finagle failure : ", e);
                    }
                }
            }
        }
        return th;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleRedirectableError(SocketAddress socketAddress, StreamOp streamOp, ResponseHeader responseHeader) {
        if (this.streamFailfast) {
            streamOp.fail(socketAddress, DLException.of(responseHeader));
        } else {
            redirect(streamOp, null);
        }
    }

    void handleServiceUnavailable(SocketAddress socketAddress, ProxyClient proxyClient, Optional<StreamOp> optional) {
        this.routingService.removeHost(socketAddress, new ServiceUnavailableException(socketAddress + " is unavailable now."));
        onServerLeft(socketAddress);
        if (optional.isPresent()) {
            this.ownershipCache.removeOwnerFromStream(((StreamOp) optional.get()).stream, socketAddress, socketAddress + " is unavailable now.");
            redirect((StreamOp) optional.get(), null);
        }
    }

    void handleRequestException(SocketAddress socketAddress, ProxyClient proxyClient, Optional<StreamOp> optional, Throwable th) {
        boolean z = false;
        boolean z2 = false;
        SocketAddress socketAddress2 = socketAddress;
        String message = th.getMessage();
        if ((th instanceof ConnectionFailedException) || (th instanceof ConnectException)) {
            this.routingService.removeHost(socketAddress, th);
            onServerLeft(socketAddress, proxyClient);
            z2 = true;
            z = true;
        } else if (th instanceof ChannelException) {
            if (th.getCause() instanceof ConnectException) {
                this.routingService.removeHost(socketAddress, th.getCause());
                onServerLeft(socketAddress);
                message = th.getCause().getMessage();
            } else {
                this.routingService.removeHost(socketAddress, th);
                message = th.getMessage();
            }
            z2 = true;
            z = true;
        } else if (th instanceof ServiceTimeoutException) {
            z = true;
            socketAddress2 = null;
        } else if (th instanceof WriteException) {
            z = true;
        } else if (th instanceof ServiceException) {
            this.clientManager.removeClient(socketAddress, proxyClient);
            z = true;
        } else if (th instanceof TApplicationException) {
            handleTApplicationException(th, optional, socketAddress, proxyClient);
        } else if (th instanceof Failure) {
            handleFinagleFailure((Failure) th, optional, socketAddress);
        } else {
            handleException(th, optional, socketAddress);
        }
        if (optional.isPresent()) {
            if (z2) {
                this.ownershipCache.removeOwnerFromStream(((StreamOp) optional.get()).stream, socketAddress, message);
            }
            if (z) {
                doSend((StreamOp) optional.get(), socketAddress2);
            }
        }
    }

    void redirect(StreamOp streamOp, SocketAddress socketAddress) {
        this.ownershipCache.getOwnershipStatsLogger().onRedirect(streamOp.stream);
        if (null == socketAddress) {
            doSend(streamOp, null);
        } else {
            logger.debug("Redirect request {} to new owner {}.", streamOp, socketAddress);
            streamOp.send(socketAddress);
        }
    }

    void handleFinagleFailure(Failure failure, Optional<StreamOp> optional, SocketAddress socketAddress) {
        if (!failure.isFlagged(Failure.Restartable())) {
            handleException(failure, optional, socketAddress);
        } else if (optional.isPresent()) {
            doSend((StreamOp) optional.get(), socketAddress);
        }
    }

    void handleException(Throwable th, Optional<StreamOp> optional, SocketAddress socketAddress) {
        if (optional.isPresent()) {
            logger.error("Failed to write request to {} @ {} : {}", new Object[]{((StreamOp) optional.get()).stream, socketAddress, th.toString()});
            ((StreamOp) optional.get()).fail(socketAddress, th);
        }
    }

    void handleTApplicationException(Throwable th, Optional<StreamOp> optional, SocketAddress socketAddress, ProxyClient proxyClient) {
        if (((TApplicationException) th).getType() != 1) {
            handleException(th, optional, socketAddress);
            return;
        }
        this.routingService.removeHost(socketAddress, th);
        onServerLeft(socketAddress, proxyClient);
        if (optional.isPresent()) {
            this.ownershipCache.removeOwnerFromStream(((StreamOp) optional.get()).stream, socketAddress, th.getMessage());
            doSend((StreamOp) optional.get(), socketAddress);
        }
    }

    void handleRedirectResponse(ResponseHeader responseHeader, StreamOp streamOp, SocketAddress socketAddress) {
        InetSocketAddress inetSocketAddress = null;
        if (responseHeader.isSetLocation()) {
            try {
                inetSocketAddress = DLSocketAddress.deserialize(responseHeader.getLocation()).getSocketAddress();
                if (socketAddress.equals(inetSocketAddress)) {
                    logger.warn("Request to stream {} is redirected to same server {}!", streamOp.stream, socketAddress);
                    inetSocketAddress = null;
                } else {
                    this.ownershipCache.updateOwner(streamOp.stream, inetSocketAddress);
                }
            } catch (IOException e) {
                inetSocketAddress = null;
            }
        }
        redirect(streamOp, inetSocketAddress);
    }

    void updateOwnership(String str, String str2) {
        try {
            this.ownershipCache.updateOwner(str, DLSocketAddress.deserialize(str2).getSocketAddress());
        } catch (IOException e) {
            logger.warn("Invalid ownership {} found for stream {} : ", new Object[]{str2, str, e});
        }
    }
}
