package org.apache.gobblin.example.wikipedia;

import com.google.common.base.Charsets;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.http.HttpClientConfigurator;
import org.apache.gobblin.http.HttpClientConfiguratorLoader;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.http.message.BasicNameValuePair;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/example/wikipedia/WikipediaExtractor.class */
public class WikipediaExtractor implements Extractor<String, JsonElement> {
    public static final String CONFIG_PREFIX = "gobblin.wikipediaSource.";
    public static final String MAX_REVISION_PER_PAGE = "gobblin.wikipediaSource.maxRevisionsPerPage";
    public static final int DEFAULT_MAX_REVISIONS_PER_PAGE = -1;
    public static final String HTTP_CLIENT_CONFIG_PREFIX = "gobblin.wikipediaSource.httpClient.";
    public static final String SOURCE_PAGE_TITLES = "source.page.titles";
    public static final String BOOTSTRAP_PERIOD = "wikipedia.source.bootstrap.lookback";
    public static final String DEFAULT_BOOTSTRAP_PERIOD = "P2D";
    public static final String WIKIPEDIA_API_ROOTURL = "wikipedia.api.rooturl";
    public static final String WIKIPEDIA_AVRO_SCHEMA = "wikipedia.avro.schema";
    private static final String JSON_MEMBER_QUERY = "query";
    private static final String JSON_MEMBER_PAGES = "pages";
    private static final String JSON_MEMBER_REVISIONS = "revisions";
    private static final String JSON_MEMBER_PAGEID = "pageid";
    private static final String JSON_MEMBER_TITLE = "title";
    private final WikiResponseReader reader;
    private final String rootUrl;
    private final String schema;
    private final String requestedTitle;
    private final long lastRevisionId;
    private Queue<JsonElement> currentBatch;
    private final WorkUnitState workUnitState;
    private final int maxRevisionsPulled;
    private final HttpClientConfigurator httpClientConfigurator;
    private HttpClient httpClient;
    private static final Logger LOG = LoggerFactory.getLogger(WikipediaExtractor.class);
    private static final DateTimeFormatter WIKIPEDIA_TIMESTAMP_FORMAT = DateTimeFormat.forPattern("YYYYMMddHHmmss");
    private static final Gson GSON = new Gson();
    private final int batchSize = 5;
    private final ImmutableMap<String, String> baseQuery = ImmutableMap.builder().put("format", "json").put("action", JSON_MEMBER_QUERY).put("prop", JSON_MEMBER_REVISIONS).build();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/gobblin/example/wikipedia/WikipediaExtractor$WikiResponseReader.class */
    public class WikiResponseReader implements Iterator<JsonElement> {
        private long lastPulledRevision;
        private long revisionsPulled = 0;

        public WikiResponseReader(long j) {
            this.lastPulledRevision = j;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if (WikipediaExtractor.this.maxRevisionsPulled > -1 && this.revisionsPulled >= WikipediaExtractor.this.maxRevisionsPulled) {
                WikipediaExtractor.this.workUnitState.setActualHighWatermark(new LongWatermark(this.lastPulledRevision));
                WikipediaExtractor.LOG.info("Pulled max number of records {}, final revision pulled {}.", Long.valueOf(this.revisionsPulled), Long.valueOf(this.lastPulledRevision));
                return false;
            }
            if (!WikipediaExtractor.this.currentBatch.isEmpty()) {
                return true;
            }
            if (this.lastPulledRevision >= WikipediaExtractor.this.lastRevisionId) {
                return false;
            }
            try {
                WikipediaExtractor.this.currentBatch = WikipediaExtractor.this.retrievePageRevisions(ImmutableMap.builder().putAll(WikipediaExtractor.this.baseQuery).put("rvprop", "ids|timestamp|user|userid|size").put("titles", WikipediaExtractor.this.requestedTitle).put("rvlimit", Integer.toString(WikipediaExtractor.this.batchSize + 1)).put("rvstartid", Long.toString(this.lastPulledRevision)).put("rvendid", Long.toString(WikipediaExtractor.this.lastRevisionId)).put("rvdir", "newer").build());
                WikipediaExtractor.this.currentBatch.poll();
                return !WikipediaExtractor.this.currentBatch.isEmpty();
            } catch (IOException | URISyntaxException e) {
                WikipediaExtractor.LOG.error("Could not retrieve more revisions.", e);
                return false;
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public JsonElement next() {
            if (!hasNext()) {
                return null;
            }
            JsonElement jsonElement = (JsonElement) WikipediaExtractor.this.currentBatch.poll();
            this.lastPulledRevision = WikipediaExtractor.this.parseRevision(jsonElement);
            this.revisionsPulled++;
            return jsonElement;
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }

    public WikipediaExtractor(WorkUnitState workUnitState) throws IOException {
        this.workUnitState = workUnitState;
        this.rootUrl = readProp(WIKIPEDIA_API_ROOTURL, workUnitState);
        this.schema = readProp(WIKIPEDIA_AVRO_SCHEMA, workUnitState);
        this.requestedTitle = workUnitState.getProp("dataset.urn");
        this.httpClientConfigurator = new HttpClientConfiguratorLoader(workUnitState).getConfigurator();
        this.httpClientConfigurator.setStatePropertiesPrefix(HTTP_CLIENT_CONFIG_PREFIX).configure(workUnitState);
        try {
            Queue<JsonElement> retrievePageRevisions = retrievePageRevisions(ImmutableMap.builder().putAll(this.baseQuery).put("rvprop", "ids").put("titles", this.requestedTitle).put("rvlimit", "1").build());
            this.lastRevisionId = retrievePageRevisions.isEmpty() ? -1L : parseRevision(retrievePageRevisions.poll());
            long value = workUnitState.getWorkunit().getLowWatermark(LongWatermark.class, new Gson()).getValue();
            if (value < 0) {
                try {
                    value = createLowWatermarkForBootstrap(workUnitState);
                } catch (IOException e) {
                    value = this.lastRevisionId;
                }
            }
            this.reader = new WikiResponseReader(value);
            workUnitState.setActualHighWatermark(new LongWatermark(this.lastRevisionId));
            this.currentBatch = new LinkedList();
            LOG.info(String.format("Will pull revisions %s to %s for page %s.", Long.valueOf(this.reader.lastPulledRevision), Long.valueOf(this.lastRevisionId), this.requestedTitle));
            this.maxRevisionsPulled = workUnitState.getPropAsInt(MAX_REVISION_PER_PAGE, -1);
        } catch (URISyntaxException e2) {
            throw new IOException(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long parseRevision(JsonElement jsonElement) {
        return jsonElement.getAsJsonObject().get("revid").getAsLong();
    }

    private long createLowWatermarkForBootstrap(WorkUnitState workUnitState) throws IOException {
        try {
            Queue<JsonElement> retrievePageRevisions = retrievePageRevisions(ImmutableMap.builder().putAll(this.baseQuery).put("rvprop", "ids").put("titles", this.requestedTitle).put("rvlimit", "1").put("rvstart", WIKIPEDIA_TIMESTAMP_FORMAT.print(DateTime.now().minus(Period.parse(workUnitState.getProp(BOOTSTRAP_PERIOD, DEFAULT_BOOTSTRAP_PERIOD))))).put("rvdir", "newer").build());
            if (retrievePageRevisions.isEmpty()) {
                throw new IOException("Could not retrieve oldest revision, returned empty revisions list.");
            }
            return parseRevision(retrievePageRevisions.poll());
        } catch (URISyntaxException e) {
            throw new IOException(e);
        }
    }

    private String readProp(String str, WorkUnitState workUnitState) {
        String prop = workUnitState.getWorkunit().getProp(str);
        if (StringUtils.isBlank(prop)) {
            prop = workUnitState.getProp(str);
        }
        if (StringUtils.isBlank(prop)) {
            prop = workUnitState.getJobState().getProp(str);
        }
        return prop;
    }

    private JsonElement performHttpQuery(String str, Map<String, String> map) throws URISyntaxException, IOException {
        if (null == this.httpClient) {
            this.httpClient = createHttpClient();
        }
        HttpUriRequest createHttpRequest = createHttpRequest(str, map);
        Closer create = Closer.create();
        StringBuilder sb = new StringBuilder();
        try {
            try {
                CloseableHttpResponse sendHttpRequest = sendHttpRequest(createHttpRequest, this.httpClient);
                if (sendHttpRequest instanceof CloseableHttpResponse) {
                    create.register(sendHttpRequest);
                }
                BufferedReader bufferedReader = (BufferedReader) create.register(new BufferedReader(new InputStreamReader(sendHttpRequest.getEntity().getContent(), ConfigurationKeys.DEFAULT_CHARSET_ENCODING)));
                while (true) {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        break;
                    }
                    sb.append(readLine + "\n");
                }
                if (!Strings.isNullOrEmpty(sb.toString())) {
                    return (JsonElement) GSON.fromJson(sb.toString(), JsonElement.class);
                }
                LOG.warn("Received empty response for query: " + createHttpRequest);
                return new JsonObject();
            } finally {
                try {
                    create.close();
                } catch (IOException e) {
                    LOG.error("IOException in Closer.close() while performing query " + createHttpRequest + ": " + e, e);
                }
            }
        } catch (Throwable th) {
            throw create.rethrow(th);
        }
    }

    public static URI createRequestURI(String str, Map<String, String> map) throws MalformedURLException, URISyntaxException {
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            newArrayList.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
        }
        return new URIBuilder(str).setQuery(URLEncodedUtils.format(newArrayList, Charsets.UTF_8)).build();
    }

    HttpUriRequest createHttpRequest(String str, Map<String, String> map) throws MalformedURLException, URISyntaxException {
        return new HttpGet(createRequestURI(str, map));
    }

    HttpResponse sendHttpRequest(HttpUriRequest httpUriRequest, HttpClient httpClient) throws ClientProtocolException, IOException {
        LOG.debug("Sending request {}", httpUriRequest);
        CloseableHttpResponse execute = httpClient.execute(httpUriRequest);
        if (execute.getStatusLine().getStatusCode() == 200 && null != execute.getEntity()) {
            return execute;
        }
        if (execute instanceof CloseableHttpResponse) {
            execute.close();
        }
        throw new IOException("HTTP Request " + httpUriRequest + " returned unexpected response " + execute);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Queue<JsonElement> retrievePageRevisions(Map<String, String> map) throws IOException, URISyntaxException {
        LinkedList linkedList = new LinkedList();
        JsonElement performHttpQuery = performHttpQuery(this.rootUrl, map);
        if (performHttpQuery == null || !performHttpQuery.isJsonObject()) {
            return linkedList;
        }
        JsonObject asJsonObject = performHttpQuery.getAsJsonObject();
        if (asJsonObject == null || !asJsonObject.has(JSON_MEMBER_QUERY)) {
            return linkedList;
        }
        JsonObject asJsonObject2 = asJsonObject.getAsJsonObject(JSON_MEMBER_QUERY);
        if (!asJsonObject2.has(JSON_MEMBER_PAGES)) {
            return linkedList;
        }
        JsonObject asJsonObject3 = asJsonObject2.getAsJsonObject(JSON_MEMBER_PAGES);
        if (asJsonObject3.entrySet().isEmpty()) {
            return linkedList;
        }
        JsonObject asJsonObject4 = asJsonObject3.getAsJsonObject((String) ((Map.Entry) asJsonObject3.entrySet().iterator().next()).getKey());
        if (!asJsonObject4.has(JSON_MEMBER_REVISIONS)) {
            return linkedList;
        }
        Iterator it = asJsonObject4.getAsJsonArray(JSON_MEMBER_REVISIONS).iterator();
        while (it.hasNext()) {
            JsonObject asJsonObject5 = ((JsonElement) it.next()).getAsJsonObject();
            if (asJsonObject4.has(JSON_MEMBER_PAGEID)) {
                asJsonObject5.add(JSON_MEMBER_PAGEID, asJsonObject4.get(JSON_MEMBER_PAGEID));
            }
            if (asJsonObject4.has(JSON_MEMBER_TITLE)) {
                asJsonObject5.add(JSON_MEMBER_TITLE, asJsonObject4.get(JSON_MEMBER_TITLE));
            }
            linkedList.add(asJsonObject5);
        }
        LOG.info(linkedList.size() + " record(s) retrieved for title " + this.requestedTitle);
        return linkedList;
    }

    protected HttpClient createHttpClient() {
        return this.httpClientConfigurator.createClient();
    }

    public void close() throws IOException {
        if (null == this.httpClient || !(this.httpClient instanceof Closeable)) {
            return;
        }
        ((Closeable) this.httpClient).close();
    }

    /* renamed from: getSchema, reason: merged with bridge method [inline-methods] */
    public String m10getSchema() {
        return this.schema;
    }

    public JsonElement readRecord(@Deprecated JsonElement jsonElement) throws DataRecordException, IOException {
        if (this.reader != null && this.reader.hasNext()) {
            return this.reader.next();
        }
        return null;
    }

    public long getExpectedRecordCount() {
        return 0L;
    }

    public long getHighWatermark() {
        return this.lastRevisionId;
    }
}
