/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.sparkreceiver;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.io.sparkreceiver.HasOffset;
import org.apache.beam.sdk.io.sparkreceiver.ReceiverBuilder;
import org.apache.beam.sdk.io.sparkreceiver.SparkConsumer;
import org.apache.beam.sdk.io.sparkreceiver.WrappedSupervisor;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.receiver.Receiver;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Iterator;
import scala.collection.mutable.ArrayBuffer;

@DoFn.UnboundedPerElement
class ReadFromSparkReceiverWithOffsetDoFn<@UnknownKeyFor V>
extends DoFn<byte[], V> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(ReadFromSparkReceiverWithOffsetDoFn.class);
    private static final @UnknownKeyFor @NonNull @Initialized long DEFAULT_START_POLL_TIMEOUT_SEC = 2L;
    private static final @UnknownKeyFor @NonNull @Initialized long DEFAULT_PULL_FREQUENCY_SEC = 0L;
    private static final @UnknownKeyFor @NonNull @Initialized long DEFAULT_START_OFFSET = 0L;
    private final @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Instant, @UnknownKeyFor @NonNull @Initialized WatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant>> createWatermarkEstimatorFn = WatermarkEstimators.Manual::new;
    private final @UnknownKeyFor @NonNull @Initialized SerializableFunction<V, @UnknownKeyFor @NonNull @Initialized Long> getOffsetFn;
    private final @UnknownKeyFor @NonNull @Initialized SerializableFunction<V, @UnknownKeyFor @NonNull @Initialized Instant> getTimestampFn;
    private final @UnknownKeyFor @NonNull @Initialized ReceiverBuilder<V, @UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Receiver<V>> sparkReceiverBuilder;
    private final @UnknownKeyFor @NonNull @Initialized Long pullFrequencySec;
    private final @UnknownKeyFor @NonNull @Initialized Long startPollTimeoutSec;
    private final @UnknownKeyFor @NonNull @Initialized Long startOffset;

    ReadFromSparkReceiverWithOffsetDoFn( @UnknownKeyFor @NonNull @Initialized SparkReceiverIO.Read<V> transform) {
        ReceiverBuilder<V, Receiver<V>> sparkReceiverBuilder = transform.getSparkReceiverBuilder();
        Preconditions.checkStateNotNull(sparkReceiverBuilder, (Object)"Spark Receiver Builder can't be null!");
        this.sparkReceiverBuilder = sparkReceiverBuilder;
        SerializableFunction<V, Long> getOffsetFn = transform.getGetOffsetFn();
        Preconditions.checkStateNotNull(getOffsetFn, (Object)"Get offset fn can't be null!");
        this.getOffsetFn = getOffsetFn;
        SerializableFunction & Serializable getTimestampFn = transform.getTimestampFn();
        if (getTimestampFn == null) {
            getTimestampFn = (SerializableFunction & Serializable)input -> Instant.now();
        }
        this.getTimestampFn = getTimestampFn;
        Long pullFrequencySec = transform.getPullFrequencySec();
        if (pullFrequencySec == null) {
            pullFrequencySec = 0L;
        }
        this.pullFrequencySec = pullFrequencySec;
        Long startPollTimeoutSec = transform.getStartPollTimeoutSec();
        if (startPollTimeoutSec == null) {
            startPollTimeoutSec = 2L;
        }
        this.startPollTimeoutSec = startPollTimeoutSec;
        Long startOffset = transform.getStartOffset();
        if (startOffset == null) {
            startOffset = 0L;
        }
        this.startOffset = startOffset;
    }

    @DoFn.GetInitialRestriction
    public @UnknownKeyFor @NonNull @Initialized OffsetRange initialRestriction(@DoFn.Element @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] element) {
        return new OffsetRange(this.startOffset.longValue(), Long.MAX_VALUE);
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public @UnknownKeyFor @NonNull @Initialized Instant getInitialWatermarkEstimatorState(@DoFn.Timestamp @UnknownKeyFor @NonNull @Initialized Instant currentElementTimestamp) {
        return currentElementTimestamp;
    }

    @DoFn.NewWatermarkEstimator
    public @UnknownKeyFor @NonNull @Initialized WatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant> newWatermarkEstimator(@DoFn.WatermarkEstimatorState @UnknownKeyFor @NonNull @Initialized Instant watermarkEstimatorState) {
        return (WatermarkEstimator)this.createWatermarkEstimatorFn.apply((Object)ReadFromSparkReceiverWithOffsetDoFn.ensureTimestampWithinBounds(watermarkEstimatorState));
    }

    @DoFn.GetSize
    public @UnknownKeyFor @NonNull @Initialized double getSize(@DoFn.Element @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] element, @DoFn.Restriction @UnknownKeyFor @NonNull @Initialized OffsetRange offsetRange) {
        return this.restrictionTracker(element, offsetRange).getProgress().getWorkRemaining();
    }

    @DoFn.NewTracker
    public @UnknownKeyFor @NonNull @Initialized OffsetRangeTracker restrictionTracker(@DoFn.Element @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] element, @DoFn.Restriction @UnknownKeyFor @NonNull @Initialized OffsetRange restriction) {
        return new OffsetRangeTracker(restriction){
            private final @UnknownKeyFor @NonNull @Initialized AtomicBoolean isCheckDoneCalled;
            {
                this.isCheckDoneCalled = new AtomicBoolean(false);
            }

            public @UnknownKeyFor @NonNull @Initialized SplitResult<@UnknownKeyFor @NonNull @Initialized OffsetRange> trySplit(@UnknownKeyFor @NonNull @Initialized double fractionOfRemainder) {
                OffsetRange primary;
                LOG.debug("Try split");
                OffsetRange curRange = this.range;
                SplitResult split = super.trySplit(fractionOfRemainder);
                if (split != null && (primary = (OffsetRange)split.getPrimary()) != null && !this.isCheckDoneCalled.get()) {
                    LOG.debug("Split is not needed");
                    this.isCheckDoneCalled.set(false);
                    this.range = curRange;
                    return null;
                }
                return split;
            }

            public void checkDone() throws @UnknownKeyFor @NonNull @Initialized IllegalStateException {
                this.isCheckDoneCalled.set(true);
            }
        };
    }

    @DoFn.GetRestrictionCoder
    public @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized OffsetRange> restrictionCoder() {
        return new OffsetRange.Coder();
    }

    @DoFn.ProcessElement
    public // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized DoFn.ProcessContinuation processElement(@DoFn.Element @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] element, @UnknownKeyFor @NonNull @Initialized RestrictionTracker<@UnknownKeyFor @NonNull @Initialized OffsetRange, @UnknownKeyFor @NonNull @Initialized Long> tracker, @UnknownKeyFor @NonNull @Initialized WatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant> watermarkEstimator, // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized DoFn.OutputReceiver<V> receiver) {
        Receiver<V> sparkReceiver;
        if (tracker.currentRestriction() != null) {
            LOG.info("Start processing element. Restriction = {}", (Object)((OffsetRange)tracker.currentRestriction()).toString());
        }
        try {
            sparkReceiver = this.sparkReceiverBuilder.build();
        }
        catch (Exception e) {
            LOG.error("Can not build Spark Receiver", (Throwable)e);
            throw new IllegalStateException("Spark Receiver was not built!");
        }
        LOG.debug("Restriction {}", (Object)((OffsetRange)tracker.currentRestriction()).toString());
        SparkConsumerWithOffset<V> sparkConsumer = new SparkConsumerWithOffset<V>(((OffsetRange)tracker.currentRestriction()).getFrom());
        sparkConsumer.start(sparkReceiver);
        Long recordsProcessed = 0L;
        block6: while (true) {
            LOG.debug("Start polling records");
            try {
                TimeUnit.SECONDS.sleep(this.startPollTimeoutSec);
            }
            catch (InterruptedException e) {
                LOG.error("SparkReceiver was interrupted before polling started", (Throwable)e);
                throw new IllegalStateException("Spark Receiver was interrupted before polling started");
            }
            if (!sparkConsumer.hasRecords()) {
                OffsetRange currentRestriction;
                LOG.debug("No records left");
                ((HasOffset)sparkReceiver).setCheckpoint(recordsProcessed);
                sparkConsumer.stop();
                tracker.checkDone();
                if (this.pullFrequencySec != 0L) {
                    LOG.debug("Waiting to poll for new records...");
                    try {
                        TimeUnit.SECONDS.sleep(this.pullFrequencySec);
                    }
                    catch (InterruptedException e) {
                        LOG.error("SparkReceiver was interrupted while waiting to poll new records", (Throwable)e);
                        throw new IllegalStateException("Spark Receiver was interrupted while waiting to poll new records");
                    }
                }
                if ((currentRestriction = (OffsetRange)tracker.currentRestriction()) != null && currentRestriction.getFrom() == currentRestriction.getTo()) {
                    LOG.info("Stop for empty restriction: {}", (Object)currentRestriction);
                    return DoFn.ProcessContinuation.stop();
                }
                LOG.info("Resume for restriction: {}", (Object)currentRestriction);
                return DoFn.ProcessContinuation.resume();
            }
            while (true) {
                if (!sparkConsumer.hasRecords()) continue block6;
                Object record = sparkConsumer.poll();
                if (record == null) continue;
                Long offset = (Long)this.getOffsetFn.apply(record);
                if (!tracker.tryClaim((Object)offset)) {
                    ((HasOffset)sparkReceiver).setCheckpoint(recordsProcessed);
                    sparkConsumer.stop();
                    LOG.info("Stop for restriction: {}", tracker.currentRestriction());
                    return DoFn.ProcessContinuation.stop();
                }
                Instant currentTimeStamp = (Instant)this.getTimestampFn.apply(record);
                Long l = recordsProcessed;
                Long l2 = recordsProcessed = Long.valueOf(recordsProcessed + 1L);
                ((ManualWatermarkEstimator)watermarkEstimator).setWatermark(currentTimeStamp);
                receiver.outputWithTimestamp(record, currentTimeStamp);
            }
            break;
        }
    }

    private static @UnknownKeyFor @NonNull @Initialized Instant ensureTimestampWithinBounds(@UnknownKeyFor @NonNull @Initialized Instant timestamp) {
        if (timestamp.isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MIN_VALUE)) {
            timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
            LOG.debug("Timestamp was before MIN_VALUE({})", (Object)BoundedWindow.TIMESTAMP_MIN_VALUE);
        } else if (timestamp.isAfter((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
            LOG.debug("Timestamp was after MAX_VALUE({})", (Object)BoundedWindow.TIMESTAMP_MAX_VALUE);
        }
        return timestamp;
    }

    private static class SparkConsumerWithOffset<@UnknownKeyFor V>
    implements SparkConsumer<V> {
        private final @UnknownKeyFor @NonNull @Initialized Queue<V> recordsQueue;
        private @Nullable @UnknownKeyFor @Initialized Receiver<V> sparkReceiver;
        private final @UnknownKeyFor @NonNull @Initialized Long startOffset;

        SparkConsumerWithOffset(@UnknownKeyFor @NonNull @Initialized Long startOffset) {
            this.startOffset = startOffset;
            this.recordsQueue = new ConcurrentLinkedQueue<V>();
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized boolean hasRecords() {
            return !this.recordsQueue.isEmpty();
        }

        @Override
        public @Nullable V poll() {
            return this.recordsQueue.poll();
        }

        @Override
        public void start(@UnknownKeyFor @NonNull @Initialized Receiver<V> sparkReceiver) {
            this.sparkReceiver = sparkReceiver;
            SerializableFunction & Serializable storeFn = (SerializableFunction & Serializable)input -> {
                if (input == null) {
                    return null;
                }
                Object data = input[0];
                if (data instanceof ByteBuffer) {
                    ByteBuffer byteBuffer = ((ByteBuffer)data).asReadOnlyBuffer();
                    byte[] bytes = new byte[byteBuffer.limit()];
                    byteBuffer.get(bytes);
                    Object record = SerializationUtils.deserialize((byte[])bytes);
                    this.recordsQueue.offer(record);
                } else if (data instanceof Iterator) {
                    Iterator iterator = (Iterator)data;
                    while (iterator.hasNext()) {
                        Object record = iterator.next();
                        this.recordsQueue.offer(record);
                    }
                } else if (data instanceof ArrayBuffer) {
                    ArrayBuffer arrayBuffer = (ArrayBuffer)data;
                    for (Object record : arrayBuffer) {
                        this.recordsQueue.offer(record);
                    }
                } else {
                    Object record = data;
                    this.recordsQueue.offer(record);
                }
                return null;
            };
            try {
                new WrappedSupervisor(sparkReceiver, new SparkConf(), (SerializableFunction<Object[], Void>)storeFn);
            }
            catch (Exception e) {
                LOG.error("Can not init Spark Receiver!", (Throwable)e);
                throw new IllegalStateException("Spark Receiver was not initialized");
            }
            LOG.debug("Starting receiver");
            ((HasOffset)sparkReceiver).setStartOffset(this.startOffset);
            sparkReceiver.supervisor().startReceiver();
            LOG.debug("Receiver started");
        }

        @Override
        public void stop() {
            if (this.sparkReceiver != null) {
                this.sparkReceiver.stop("SparkReceiver is stopped.");
            }
            LOG.info("Clear records queue: {} records", (Object)this.recordsQueue.size());
            this.recordsQueue.clear();
        }
    }
}

