package com.twitter.hbc.httpclient;

import com.twitter.hbc.RateTracker;
import com.twitter.hbc.ReconnectionManager;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpConstants;
import com.twitter.hbc.core.StatsReporter;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
import com.twitter.hbc.core.event.ConnectionEvent;
import com.twitter.hbc.core.event.Event;
import com.twitter.hbc.core.event.EventType;
import com.twitter.hbc.core.event.HttpResponseEvent;
import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.http.StatusLine;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpUriRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/twitter/hbc/httpclient/ClientBase.class */
class ClientBase implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(ClientBase.class);
    private final String name;
    private final HttpClient client;
    private final StreamingEndpoint endpoint;
    private final Hosts hosts;
    private final Authentication auth;
    private final HosebirdMessageProcessor processor;
    private final ReconnectionManager reconnectionManager;
    private final AtomicReference<Event> exitEvent;
    private final CountDownLatch isRunning;
    private final RateTracker rateTracker;
    private final BlockingQueue<Event> eventsQueue;
    private final StatsReporter statsReporter;
    private final AtomicBoolean connectionEstablished;
    private final AtomicBoolean reconnect;

    ClientBase(String str, HttpClient httpClient, Hosts hosts, StreamingEndpoint streamingEndpoint, Authentication authentication, HosebirdMessageProcessor hosebirdMessageProcessor, ReconnectionManager reconnectionManager, RateTracker rateTracker) {
        this(str, httpClient, hosts, streamingEndpoint, authentication, hosebirdMessageProcessor, reconnectionManager, rateTracker, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClientBase(String str, HttpClient httpClient, Hosts hosts, StreamingEndpoint streamingEndpoint, Authentication authentication, HosebirdMessageProcessor hosebirdMessageProcessor, ReconnectionManager reconnectionManager, RateTracker rateTracker, @Nullable BlockingQueue<Event> blockingQueue) {
        this.client = (HttpClient) Preconditions.checkNotNull(httpClient);
        this.name = (String) Preconditions.checkNotNull(str);
        this.endpoint = (StreamingEndpoint) Preconditions.checkNotNull(streamingEndpoint);
        this.hosts = (Hosts) Preconditions.checkNotNull(hosts);
        this.auth = (Authentication) Preconditions.checkNotNull(authentication);
        this.processor = (HosebirdMessageProcessor) Preconditions.checkNotNull(hosebirdMessageProcessor);
        this.reconnectionManager = (ReconnectionManager) Preconditions.checkNotNull(reconnectionManager);
        this.rateTracker = (RateTracker) Preconditions.checkNotNull(rateTracker);
        this.eventsQueue = blockingQueue;
        this.exitEvent = new AtomicReference<>();
        this.isRunning = new CountDownLatch(1);
        this.statsReporter = new StatsReporter();
        this.connectionEstablished = new AtomicBoolean(false);
        this.reconnect = new AtomicBoolean(false);
    }

    /* JADX WARN: Code restructure failed: missing block: B:27:0x0032, code lost:
    
        setExitStatus(new com.twitter.hbc.core.event.Event(com.twitter.hbc.core.event.EventType.STOPPED_BY_ERROR, "No hosts available"));
     */
    @Override // java.lang.Runnable
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void run() {
        /*
            Method dump skipped, instructions count: 521
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.twitter.hbc.httpclient.ClientBase.run():void");
    }

    @Nullable
    @VisibleForTesting
    StatusLine establishConnection(Connection connection, HttpUriRequest httpUriRequest) {
        logger.info("{} Establishing a connection", this.name);
        StatusLine statusLine = null;
        try {
            addEvent(new ConnectionEvent(EventType.CONNECTION_ATTEMPT, httpUriRequest));
            statusLine = connection.connect(httpUriRequest);
        } catch (UnknownHostException e) {
            logger.warn("{} Unknown host - {}", this.name, httpUriRequest.getURI().getHost());
            addEvent(new Event(EventType.CONNECTION_ERROR, e));
        } catch (IOException e2) {
            logger.warn("{} IOException caught when establishing connection to {}", this.name, httpUriRequest.getURI());
            addEvent(new Event(EventType.CONNECTION_ERROR, e2));
            this.reconnectionManager.handleLinearBackoff();
        } catch (Exception e3) {
            logger.error(String.format("%s Unknown exception while establishing connection to %s", this.name, httpUriRequest.getURI()), e3);
            setExitStatus(new Event(EventType.STOPPED_BY_ERROR, e3));
        }
        return statusLine;
    }

    @VisibleForTesting
    boolean handleConnectionResult(@Nullable StatusLine statusLine) {
        this.statsReporter.incrNumConnects();
        if (statusLine == null) {
            logger.warn("{} failed to establish connection properly", this.name);
            addEvent(new Event(EventType.CONNECTION_ERROR, "Failed to establish connection properly"));
            return false;
        }
        int statusCode = statusLine.getStatusCode();
        if (statusCode == 200) {
            logger.debug("{} Connection successfully established", this.name);
            this.statsReporter.incrNum200s();
            this.connectionEstablished.set(true);
            addEvent(new HttpResponseEvent(EventType.CONNECTED, statusLine));
            this.reconnectionManager.resetCounts();
            return true;
        }
        logger.warn(this.name + " Error connecting w/ status code - {}, reason - {}", Integer.valueOf(statusCode), statusLine.getReasonPhrase());
        this.statsReporter.incrNumConnectionFailures();
        addEvent(new HttpResponseEvent(EventType.HTTP_ERROR, statusLine));
        if (HttpConstants.FATAL_CODES.contains(Integer.valueOf(statusCode))) {
            setExitStatus(new Event(EventType.STOPPED_BY_ERROR, "Fatal error code: " + statusCode));
            return false;
        }
        if (statusCode >= 500 || statusCode < 400) {
            if (statusCode < 500) {
                setExitStatus(new Event(EventType.STOPPED_BY_ERROR, statusLine.getReasonPhrase()));
                return false;
            }
            this.statsReporter.incrNum500s();
            this.reconnectionManager.handleExponentialBackoff();
            return false;
        }
        this.statsReporter.incrNum400s();
        if (this.reconnectionManager.shouldReconnectOn400s()) {
            logger.debug("{} Reconnecting on {}", this.name, Integer.valueOf(statusCode));
            this.reconnectionManager.handleExponentialBackoff();
            return false;
        }
        logger.debug("{} Reconnecting retries exhausted for {}", this.name, Integer.valueOf(statusCode));
        setExitStatus(new Event(EventType.STOPPED_BY_ERROR, "Retries exhausted"));
        return false;
    }

    private void processConnectionData(Connection connection) {
        logger.info("{} Processing connection data", this.name);
        try {
            addEvent(new Event(EventType.PROCESSING, "Processing messages"));
            while (!isDone() && !this.reconnect.getAndSet(false)) {
                if (connection.processResponse()) {
                    this.statsReporter.incrNumMessages();
                } else {
                    this.statsReporter.incrNumMessagesDropped();
                }
                this.rateTracker.eventObserved();
            }
        } catch (IOException e) {
            logger.info("{} Disconnected during processing - will reconnect", this.name);
            this.statsReporter.incrNumDisconnects();
            addEvent(new Event(EventType.DISCONNECTED, e));
        } catch (InterruptedException e2) {
            logger.info("{} Thread interrupted during processing, exiting", this.name);
            this.statsReporter.incrNumDisconnects();
            setExitStatus(new Event(EventType.STOPPED_BY_ERROR, e2));
        } catch (RuntimeException e3) {
            logger.warn(this.name + " Unknown error processing connection: ", e3);
            this.statsReporter.incrNumDisconnects();
            addEvent(new Event(EventType.DISCONNECTED, e3));
        } catch (Exception e4) {
            logger.warn(this.name + " Unexpected exception during processing", e4);
            this.statsReporter.incrNumDisconnects();
            setExitStatus(new Event(EventType.STOPPED_BY_ERROR, e4));
        }
    }

    private void setExitStatus(Event event) {
        logger.info("{} exit event - {}", this.name, event.getMessage());
        addEvent(event);
        this.exitEvent.set(event);
    }

    private void addEvent(Event event) {
        if (this.eventsQueue == null || this.eventsQueue.offer(event)) {
            return;
        }
        this.statsReporter.incrNumClientEventsDropped();
    }

    public void reconnect() {
        if (this.connectionEstablished.get()) {
            this.reconnect.set(true);
        }
    }

    public void stop(int i) throws InterruptedException {
        try {
            if (!isDone()) {
                setExitStatus(new Event(EventType.STOPPED_BY_USER, String.format("Stopped by user: waiting for %d ms", Integer.valueOf(i))));
            }
            if (!waitForFinish(i)) {
                logger.warn("{} Client thread failed to finish in {} millis", this.name, Integer.valueOf(i));
            }
        } finally {
            this.rateTracker.shutdown();
        }
    }

    public void shutdown(int i) {
        try {
            stop(i);
        } catch (InterruptedException e) {
            logger.warn("Client failed to shutdown due to interruption", e);
        }
    }

    public boolean isDone() {
        return this.exitEvent.get() != null;
    }

    public Event getExitEvent() {
        if (isDone()) {
            return this.exitEvent.get();
        }
        throw new IllegalStateException(this.name + " Still running");
    }

    public boolean waitForFinish(int i) throws InterruptedException {
        return this.isRunning.await(i, TimeUnit.MILLISECONDS);
    }

    public void waitForFinish() throws InterruptedException {
        this.isRunning.await();
    }

    public String toString() {
        return String.format("%s, endpoint: %s", getName(), this.endpoint.getURI());
    }

    public String getName() {
        return this.name;
    }

    public StreamingEndpoint getEndpoint() {
        return this.endpoint;
    }

    public StatsReporter.StatsTracker getStatsTracker() {
        return this.statsReporter.getStatsTracker();
    }
}
