package org.apache.beam.examples.twitterstreamgenerator;

import java.io.IOException;
import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.Status;

/* JADX INFO: Access modifiers changed from: package-private */
@DoFn.UnboundedPerElement
/* loaded from: input_file:org/apache/beam/examples/twitterstreamgenerator/ReadFromTwitterDoFn.class */
public final class ReadFromTwitterDoFn extends DoFn<TwitterConfig, String> {
    private final DateTime startTime = new DateTime();
    private static final Logger LOG = LoggerFactory.getLogger(ReadFromTwitterDoFn.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/examples/twitterstreamgenerator/ReadFromTwitterDoFn$OffsetHolder.class */
    public static class OffsetHolder implements Serializable {
        public final TwitterConfig twitterConfig;
        public final Long fetchedRecords;

        OffsetHolder(TwitterConfig twitterConfig, Long l) {
            this.twitterConfig = twitterConfig;
            this.fetchedRecords = l;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            OffsetHolder offsetHolder = (OffsetHolder) obj;
            return Objects.equals(this.twitterConfig, offsetHolder.twitterConfig) && Objects.equals(this.fetchedRecords, offsetHolder.fetchedRecords);
        }

        public int hashCode() {
            return Objects.hash(this.twitterConfig, this.fetchedRecords);
        }
    }

    /* loaded from: input_file:org/apache/beam/examples/twitterstreamgenerator/ReadFromTwitterDoFn$OffsetTracker.class */
    static class OffsetTracker extends RestrictionTracker<OffsetHolder, TwitterConfig> implements Serializable {
        private OffsetHolder restriction;
        private final DateTime startTime;

        OffsetTracker(OffsetHolder offsetHolder, DateTime dateTime) {
            this.restriction = offsetHolder;
            this.startTime = dateTime;
        }

        public boolean tryClaim(TwitterConfig twitterConfig) {
            ReadFromTwitterDoFn.LOG.debug("-------------- Claiming {} used to have: {}", Integer.valueOf(twitterConfig.hashCode()), this.restriction.fetchedRecords);
            long longValue = (this.restriction == null || this.restriction.fetchedRecords == null) ? 0L : this.restriction.fetchedRecords.longValue() + 1;
            long currentTimeMillis = System.currentTimeMillis() - this.startTime.getMillis();
            ReadFromTwitterDoFn.LOG.debug("-------------- Time running: {} / {}", Long.valueOf(currentTimeMillis), Long.valueOf(twitterConfig.getMinutesToRun().intValue() * 60000));
            if (longValue > twitterConfig.getTweetsCount().longValue() || currentTimeMillis > twitterConfig.getMinutesToRun().intValue() * 60000) {
                return false;
            }
            this.restriction = new OffsetHolder(twitterConfig, Long.valueOf(longValue));
            return true;
        }

        /* renamed from: currentRestriction, reason: merged with bridge method [inline-methods] */
        public OffsetHolder m1currentRestriction() {
            return this.restriction;
        }

        public SplitResult<OffsetHolder> trySplit(double d) {
            ReadFromTwitterDoFn.LOG.debug("-------------- Trying to split: fractionOfRemainder={}", Double.valueOf(d));
            return SplitResult.of(new OffsetHolder(null, 0L), this.restriction);
        }

        public void checkDone() throws IllegalStateException {
        }

        public RestrictionTracker.IsBounded isBounded() {
            return RestrictionTracker.IsBounded.UNBOUNDED;
        }
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public Instant getInitialWatermarkEstimatorState(@DoFn.Timestamp Instant instant) {
        return instant;
    }

    private static Instant ensureTimestampWithinBounds(Instant instant) {
        if (instant.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
            instant = BoundedWindow.TIMESTAMP_MIN_VALUE;
        } else if (instant.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            instant = BoundedWindow.TIMESTAMP_MAX_VALUE;
        }
        return instant;
    }

    @DoFn.NewWatermarkEstimator
    public WatermarkEstimators.Manual newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant) {
        return new WatermarkEstimators.Manual(ensureTimestampWithinBounds(instant));
    }

    @DoFn.GetInitialRestriction
    public OffsetHolder getInitialRestriction(@DoFn.Element TwitterConfig twitterConfig) throws IOException {
        return new OffsetHolder(null, 0L);
    }

    @DoFn.NewTracker
    public RestrictionTracker<OffsetHolder, TwitterConfig> newTracker(@DoFn.Element TwitterConfig twitterConfig, @DoFn.Restriction OffsetHolder offsetHolder) {
        return new OffsetTracker(offsetHolder, this.startTime);
    }

    @DoFn.GetRestrictionCoder
    public Coder<OffsetHolder> getRestrictionCoder() {
        return SerializableCoder.of(OffsetHolder.class);
    }

    @DoFn.ProcessElement
    public DoFn.ProcessContinuation processElement(@DoFn.Element TwitterConfig twitterConfig, DoFn.OutputReceiver<String> outputReceiver, RestrictionTracker<OffsetRange, TwitterConfig> restrictionTracker, ManualWatermarkEstimator<Instant> manualWatermarkEstimator) {
        LOG.debug("In Read From Twitter Do Fn");
        TwitterConnection twitterConnection = TwitterConnection.getInstance(twitterConfig);
        BlockingQueue<Status> queue = twitterConnection.getQueue();
        if (queue.isEmpty() && checkIfDone(twitterConnection, twitterConfig, restrictionTracker)) {
            return DoFn.ProcessContinuation.stop();
        }
        while (!queue.isEmpty()) {
            Status poll = queue.poll();
            if (checkIfDone(twitterConnection, twitterConfig, restrictionTracker)) {
                return DoFn.ProcessContinuation.stop();
            }
            if (poll != null) {
                Instant ofEpochMilli = Instant.ofEpochMilli(poll.getCreatedAt().getTime());
                manualWatermarkEstimator.setWatermark(ofEpochMilli);
                outputReceiver.outputWithTimestamp(poll.getText(), ofEpochMilli);
            }
        }
        return DoFn.ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1L));
    }

    boolean checkIfDone(TwitterConnection twitterConnection, TwitterConfig twitterConfig, RestrictionTracker<OffsetRange, TwitterConfig> restrictionTracker) {
        if (restrictionTracker.tryClaim(twitterConfig)) {
            return false;
        }
        twitterConnection.closeStream();
        return true;
    }
}
