package org.apache.flume.source.twitter;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.MediaEntity;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.User;
import twitter4j.auth.AccessToken;

@InterfaceAudience.Private
@InterfaceStability.Unstable
/* loaded from: input_file:org/apache/flume/source/twitter/TwitterSource.class */
public class TwitterSource extends AbstractSource implements EventDrivenSource, Configurable, StatusListener, BatchSizeSupported {
    private TwitterStream twitterStream;
    private Schema avroSchema;
    private DataFileWriter<GenericRecord> dataFileWriter;
    private SourceCounter sourceCounter;
    private static int REPORT_INTERVAL = 100;
    private static int STATS_INTERVAL = REPORT_INTERVAL * 10;
    private static final Logger LOGGER = LoggerFactory.getLogger(TwitterSource.class);
    private long docCount = 0;
    private long startTime = 0;
    private long exceptionCount = 0;
    private long totalTextIndexed = 0;
    private long skippedDocs = 0;
    private long batchEndTime = 0;
    private final List<GenericData.Record> docs = new ArrayList();
    private final ByteArrayOutputStream serializationBuffer = new ByteArrayOutputStream();
    private int maxBatchSize = 1000;
    private int maxBatchDurationMillis = 1000;
    private SimpleDateFormat formatterTo = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
    private DecimalFormat numFormatter = new DecimalFormat("###,###.###");

    public void configure(Context context) {
        String string = context.getString("consumerKey");
        String string2 = context.getString("consumerSecret");
        String string3 = context.getString("accessToken");
        String string4 = context.getString("accessTokenSecret");
        this.twitterStream = new TwitterStreamFactory().getInstance();
        this.twitterStream.setOAuthConsumer(string, string2);
        this.twitterStream.setOAuthAccessToken(new AccessToken(string3, string4));
        this.twitterStream.addListener(this);
        this.avroSchema = createAvroSchema();
        this.dataFileWriter = new DataFileWriter<>(new GenericDatumWriter(this.avroSchema));
        this.maxBatchSize = context.getInteger("maxBatchSize", Integer.valueOf(this.maxBatchSize)).intValue();
        this.maxBatchDurationMillis = context.getInteger("maxBatchDurationMillis", Integer.valueOf(this.maxBatchDurationMillis)).intValue();
        if (this.sourceCounter == null) {
            this.sourceCounter = new SourceCounter(getName());
        }
    }

    public synchronized void start() {
        LOGGER.info("Starting twitter source {} ...", this);
        this.docCount = 0L;
        this.startTime = System.currentTimeMillis();
        this.exceptionCount = 0L;
        this.totalTextIndexed = 0L;
        this.skippedDocs = 0L;
        this.batchEndTime = System.currentTimeMillis() + this.maxBatchDurationMillis;
        this.twitterStream.sample();
        LOGGER.info("Twitter source {} started.", getName());
        super.start();
    }

    public synchronized void stop() {
        LOGGER.info("Twitter source {} stopping...", getName());
        this.twitterStream.shutdown();
        super.stop();
        LOGGER.info("Twitter source {} stopped.", getName());
    }

    public void onStatus(Status status) {
        GenericData.Record extractRecord = extractRecord("", this.avroSchema, status);
        if (extractRecord == null) {
            return;
        }
        this.docs.add(extractRecord);
        if (this.docs.size() >= this.maxBatchSize || System.currentTimeMillis() >= this.batchEndTime) {
            this.sourceCounter.addToEventReceivedCount(this.docs.size());
            this.batchEndTime = System.currentTimeMillis() + this.maxBatchDurationMillis;
            try {
                getChannelProcessor().processEvent(EventBuilder.withBody(serializeToAvro(this.avroSchema, this.docs)));
                this.docs.clear();
                this.sourceCounter.addToEventAcceptedCount(this.docs.size());
            } catch (IOException e) {
                this.sourceCounter.incrementGenericProcessingFail();
                LOGGER.error("Exception while serializing tweet", e);
                return;
            }
        }
        this.docCount++;
        if (this.docCount % REPORT_INTERVAL == 0) {
            LOGGER.info(String.format("Processed %s docs", this.numFormatter.format(this.docCount)));
        }
        if (this.docCount % STATS_INTERVAL == 0) {
            logStats();
        }
    }

    private Schema createAvroSchema() {
        Schema createRecord = Schema.createRecord("Doc", "adoc", (String) null, false);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Schema.Field("id", Schema.create(Schema.Type.STRING), (String) null, (Object) null));
        arrayList.add(new Schema.Field("user_friends_count", createOptional(Schema.create(Schema.Type.INT)), (String) null, (Object) null));
        arrayList.add(new Schema.Field("user_location", createOptional(Schema.create(Schema.Type.STRING)), (String) null, (Object) null));
        arrayList.add(new Schema.Field("user_description", createOptional(Schema.create(Schema.Type.STRING)), (String) null, (Object) null));
        arrayList.add(new Schema.Field("user_statuses_count", createOptional(Schema.create(Schema.Type.INT)), (String) null, (Object) null));
        arrayList.add(new Schema.Field("user_followers_count", createOptional(Schema.create(Schema.Type.INT)), (String) null, (Object) null));
        arrayList.add(new Schema.Field("user_name", createOptional(Schema.create(Schema.Type.STRING)), (String) null, (Object) null));
        arrayList.add(new Schema.Field("user_screen_name", createOptional(Schema.create(Schema.Type.STRING)), (String) null, (Object) null));
        arrayList.add(new Schema.Field("created_at", createOptional(Schema.create(Schema.Type.STRING)), (String) null, (Object) null));
        arrayList.add(new Schema.Field("text", createOptional(Schema.create(Schema.Type.STRING)), (String) null, (Object) null));
        arrayList.add(new Schema.Field("retweet_count", createOptional(Schema.create(Schema.Type.LONG)), (String) null, (Object) null));
        arrayList.add(new Schema.Field("retweeted", createOptional(Schema.create(Schema.Type.BOOLEAN)), (String) null, (Object) null));
        arrayList.add(new Schema.Field("in_reply_to_user_id", createOptional(Schema.create(Schema.Type.LONG)), (String) null, (Object) null));
        arrayList.add(new Schema.Field("source", createOptional(Schema.create(Schema.Type.STRING)), (String) null, (Object) null));
        arrayList.add(new Schema.Field("in_reply_to_status_id", createOptional(Schema.create(Schema.Type.LONG)), (String) null, (Object) null));
        arrayList.add(new Schema.Field("media_url_https", createOptional(Schema.create(Schema.Type.STRING)), (String) null, (Object) null));
        arrayList.add(new Schema.Field("expanded_url", createOptional(Schema.create(Schema.Type.STRING)), (String) null, (Object) null));
        createRecord.setFields(arrayList);
        return createRecord;
    }

    private GenericData.Record extractRecord(String str, Schema schema, Status status) {
        User user = status.getUser();
        GenericData.Record record = new GenericData.Record(schema);
        record.put("id", str + status.getId());
        record.put("created_at", this.formatterTo.format(status.getCreatedAt()));
        record.put("retweet_count", Long.valueOf(status.getRetweetCount()));
        record.put("retweeted", Boolean.valueOf(status.isRetweet()));
        record.put("in_reply_to_user_id", Long.valueOf(status.getInReplyToUserId()));
        record.put("in_reply_to_status_id", Long.valueOf(status.getInReplyToStatusId()));
        addString(record, "source", status.getSource());
        addString(record, "text", status.getText());
        MediaEntity[] mediaEntities = status.getMediaEntities();
        if (mediaEntities.length > 0) {
            addString(record, "media_url_https", mediaEntities[0].getMediaURLHttps());
            addString(record, "expanded_url", mediaEntities[0].getExpandedURL());
        }
        record.put("user_friends_count", Integer.valueOf(user.getFriendsCount()));
        record.put("user_statuses_count", Integer.valueOf(user.getStatusesCount()));
        record.put("user_followers_count", Integer.valueOf(user.getFollowersCount()));
        addString(record, "user_location", user.getLocation());
        addString(record, "user_description", user.getDescription());
        addString(record, "user_screen_name", user.getScreenName());
        addString(record, "user_name", user.getName());
        return record;
    }

    private byte[] serializeToAvro(Schema schema, List<GenericData.Record> list) throws IOException {
        this.serializationBuffer.reset();
        this.dataFileWriter.create(schema, this.serializationBuffer);
        try {
            Iterator<GenericData.Record> it = list.iterator();
            while (it.hasNext()) {
                this.dataFileWriter.append(it.next());
            }
            return this.serializationBuffer.toByteArray();
        } finally {
            this.dataFileWriter.close();
        }
    }

    private Schema createOptional(Schema schema) {
        return Schema.createUnion(Arrays.asList(schema, Schema.create(Schema.Type.NULL)));
    }

    private void addString(GenericData.Record record, String str, String str2) {
        if (str2 == null) {
            return;
        }
        record.put(str, str2);
        this.totalTextIndexed += str2.length();
    }

    private void logStats() {
        long max = Math.max((System.currentTimeMillis() - this.startTime) / 1000, 1L);
        LOGGER.info(String.format("Total docs indexed: %s, total skipped docs: %s", this.numFormatter.format(this.docCount), this.numFormatter.format(this.skippedDocs)));
        LOGGER.info(String.format("    %s docs/second", this.numFormatter.format(this.docCount / max)));
        LOGGER.info(String.format("Run took %s seconds and processed:", this.numFormatter.format(max)));
        LOGGER.info(String.format("    %s MB/sec sent to index", this.numFormatter.format((((float) this.totalTextIndexed) / 1048576.0f) / ((float) max))));
        LOGGER.info(String.format("    %s MB text sent to index", this.numFormatter.format(this.totalTextIndexed / 1048576.0d)));
        LOGGER.info(String.format("There were %s exceptions ignored: ", this.numFormatter.format(this.exceptionCount)));
    }

    public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
    }

    public void onScrubGeo(long j, long j2) {
    }

    public void onStallWarning(StallWarning stallWarning) {
    }

    public void onTrackLimitationNotice(int i) {
    }

    public void onException(Exception exc) {
        LOGGER.error("Exception while streaming tweets", exc);
    }

    public long getBatchSize() {
        return this.maxBatchSize;
    }
}
