package com.github.brandtg.switchboard;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URLEncoder;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.eclipse.jetty.util.URIUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/brandtg/switchboard/LogPuller.class */
public class LogPuller implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) LogPuller.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final String ENCODING = "UTF-8";
    private final String collection;
    private final long lastIndex;
    private final InetSocketAddress sourceAddress;
    private final InetSocketAddress sinkAddress;
    private final AtomicBoolean isShutdown = new AtomicBoolean(false);

    public LogPuller(InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2, String str, long j) {
        this.sourceAddress = inetSocketAddress;
        this.sinkAddress = inetSocketAddress2;
        this.collection = str;
        this.lastIndex = j;
    }

    public void shutdown() {
        if (this.isShutdown.getAndSet(true)) {
            return;
        }
        LOG.info("Shut down log puller");
    }

    @Override // java.lang.Runnable
    public void run() {
        boolean z;
        CloseableHttpClient createDefault = HttpClients.createDefault();
        AtomicLong atomicLong = new AtomicLong(this.lastIndex);
        HttpHost httpHost = new HttpHost(this.sourceAddress.getAddress(), this.sourceAddress.getPort());
        boolean z2 = true;
        while (!this.isShutdown.get()) {
            StringBuilder sb = new StringBuilder();
            if (z) {
                try {
                    sb.append("/log/metadata/header?target=").append(this.sinkAddress.getHostName()).append(":").append(this.sinkAddress.getPort());
                    z2 = false;
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            } else {
                sb.append("/log/").append(URLEncoder.encode(this.collection, "UTF-8")).append(URIUtil.SLASH).append(atomicLong.get()).append("?target=").append(this.sinkAddress.getHostName()).append(":").append(this.sinkAddress.getPort());
            }
            int i = -1;
            synchronized (this) {
                boolean z3 = null;
                try {
                    try {
                        HttpResponse execute = createDefault.execute(httpHost, (HttpRequest) new HttpGet(URI.create(sb.toString())));
                        z3 = execute.getEntity();
                        i = execute.getStatusLine().getStatusCode();
                        if (i == 200) {
                            wait();
                            LogRegionResponse logRegionResponse = (LogRegionResponse) OBJECT_MAPPER.readValue(execute.getEntity().getContent(), LogRegionResponse.class);
                            atomicLong.set(logRegionResponse.getLogRegions().get(logRegionResponse.getLogRegions().size() - 1).getIndex());
                            Iterator<LogRegion> it = logRegionResponse.getLogRegions().iterator();
                            while (it.hasNext()) {
                                LOG.info("Received {}", it.next());
                            }
                        }
                        if (z3 != null) {
                            try {
                                EntityUtils.consume(z3);
                            } catch (IOException e2) {
                                LOG.error("Error", (Throwable) e2);
                            }
                        }
                    } catch (Exception e3) {
                        LOG.error("Error", (Throwable) e3);
                        if (z3 != null) {
                            try {
                                EntityUtils.consume(z3);
                            } catch (IOException e4) {
                                LOG.error("Error", (Throwable) e4);
                            }
                        }
                    }
                } finally {
                    if (z3 != null) {
                        try {
                            EntityUtils.consume(z3);
                        } catch (IOException e5) {
                            LOG.error("Error", (Throwable) e5);
                        }
                    }
                }
            }
            if (i != 200) {
                try {
                    LOG.debug("No data available, sleeping 1000 ms");
                    Thread.sleep(1000L);
                } catch (InterruptedException e6) {
                    LOG.warn("Error while sleeping for more data", (Throwable) e6);
                }
            }
        }
    }
}
