package org.apache.streams.local.tasks;

import java.math.BigInteger;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.DatumStatusCountable;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.core.util.DatumUtils;
import org.apache.streams.local.counters.StreamsTaskCounter;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/local/tasks/StreamsProviderTask.class */
public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusCountable {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamsProviderTask.class);
    private static final int START = 0;
    private static final int END = 1;
    private StreamsProvider provider;
    private final AtomicBoolean keepRunning;
    private final AtomicBoolean flushing;
    private final AtomicBoolean started;
    private Type type;
    private BigInteger sequence;
    private DateTime[] dateRange;
    private Map<String, Object> config;
    private int timeout;
    private long sleepTime;
    private int zeros;
    private DatumStatusCounter statusCounter;
    private StreamsTaskCounter counter;

    /* renamed from: org.apache.streams.local.tasks.StreamsProviderTask$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/streams/local/tasks/StreamsProviderTask$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$streams$local$tasks$StreamsProviderTask$Type = new int[Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$streams$local$tasks$StreamsProviderTask$Type[Type.PERPETUAL.ordinal()] = StreamsProviderTask.END;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$streams$local$tasks$StreamsProviderTask$Type[Type.READ_CURRENT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$streams$local$tasks$StreamsProviderTask$Type[Type.READ_NEW.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$streams$local$tasks$StreamsProviderTask$Type[Type.READ_RANGE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:org/apache/streams/local/tasks/StreamsProviderTask$Type.class */
    private enum Type {
        PERPETUAL,
        READ_CURRENT,
        READ_NEW,
        READ_RANGE
    }

    public DatumStatusCounter getDatumStatusCounter() {
        return this.statusCounter;
    }

    public StreamsProviderTask(StreamsProvider streamsProvider, boolean z, Map<String, Object> map) {
        super(map);
        this.keepRunning = new AtomicBoolean(true);
        this.flushing = new AtomicBoolean(false);
        this.started = new AtomicBoolean(false);
        this.zeros = START;
        this.statusCounter = new DatumStatusCounter();
        this.provider = streamsProvider;
        if (z) {
            this.type = Type.PERPETUAL;
        } else {
            this.type = Type.READ_CURRENT;
        }
        this.timeout = StreamsTask.DEFAULT_TIMEOUT_MS;
        this.sleepTime = StreamsTask.DEFAULT_SLEEP_TIME_MS;
    }

    public StreamsProviderTask(StreamsProvider streamsProvider, BigInteger bigInteger, Map<String, Object> map) {
        super(map);
        this.keepRunning = new AtomicBoolean(true);
        this.flushing = new AtomicBoolean(false);
        this.started = new AtomicBoolean(false);
        this.zeros = START;
        this.statusCounter = new DatumStatusCounter();
        this.provider = streamsProvider;
        this.type = Type.READ_NEW;
        this.sequence = bigInteger;
        this.timeout = StreamsTask.DEFAULT_TIMEOUT_MS;
        this.sleepTime = StreamsTask.DEFAULT_SLEEP_TIME_MS;
    }

    public StreamsProviderTask(StreamsProvider streamsProvider, DateTime dateTime, DateTime dateTime2, Map<String, Object> map) {
        super(map);
        this.keepRunning = new AtomicBoolean(true);
        this.flushing = new AtomicBoolean(false);
        this.started = new AtomicBoolean(false);
        this.zeros = START;
        this.statusCounter = new DatumStatusCounter();
        this.provider = streamsProvider;
        this.type = Type.READ_RANGE;
        this.dateRange = new DateTime[2];
        this.dateRange[START] = dateTime;
        this.dateRange[END] = dateTime2;
        this.timeout = StreamsTask.DEFAULT_TIMEOUT_MS;
        this.sleepTime = StreamsTask.DEFAULT_SLEEP_TIME_MS;
    }

    public void setTimeout(int i) {
        this.timeout = i;
    }

    public void setSleepTime(long j) {
        this.sleepTime = j;
    }

    @Override // org.apache.streams.local.tasks.BaseStreamsTask, org.apache.streams.local.tasks.StreamsTask
    public boolean isWaiting() {
        return false;
    }

    @Override // org.apache.streams.local.tasks.StreamsTask
    public void stopTask() {
        LOGGER.debug("Stopping Provider Task for {}", this.provider.getClass().getSimpleName());
        this.keepRunning.set(false);
    }

    @Override // org.apache.streams.local.tasks.BaseStreamsTask, org.apache.streams.local.tasks.StreamsTask
    public void addInputQueue(BlockingQueue<StreamsDatum> blockingQueue) {
        throw new UnsupportedOperationException(getClass().getName() + " does not support method - setInputQueue()");
    }

    @Override // org.apache.streams.local.tasks.StreamsTask
    public void setStreamConfig(Map<String, Object> map) {
        this.config = map;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                this.provider.prepare(this.config);
                StreamsResultSet streamsResultSet = START;
                long j = this.timeout < 0 ? Long.MAX_VALUE : this.timeout / this.sleepTime;
                if (this.counter == null) {
                    this.counter = new StreamsTaskCounter(this.provider.getClass().getName() + UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
                }
                switch (AnonymousClass1.$SwitchMap$org$apache$streams$local$tasks$StreamsProviderTask$Type[this.type.ordinal()]) {
                    case END /* 1 */:
                        this.provider.startStream();
                        this.started.set(true);
                        while (isRunning()) {
                            try {
                                long currentTimeMillis = System.currentTimeMillis();
                                streamsResultSet = this.provider.readCurrent();
                                this.counter.addTime(System.currentTimeMillis() - currentTimeMillis);
                                if (streamsResultSet.size() == 0) {
                                    this.zeros += END;
                                } else {
                                    this.zeros = START;
                                }
                                flushResults(streamsResultSet);
                                if (this.zeros > j) {
                                    this.keepRunning.set(false);
                                }
                                if (this.zeros > 0) {
                                    Thread.sleep(this.sleepTime);
                                }
                            } catch (InterruptedException e) {
                                this.counter.incrementErrorCount();
                                LOGGER.warn("Thread interrupted");
                                this.keepRunning.set(false);
                            }
                        }
                        break;
                    case 2:
                        streamsResultSet = this.provider.readCurrent();
                        this.started.set(true);
                        break;
                    case 3:
                        streamsResultSet = this.provider.readNew(this.sequence);
                        this.started.set(true);
                        break;
                    case 4:
                        streamsResultSet = this.provider.readRange(this.dateRange[START], this.dateRange[END]);
                        this.started.set(true);
                        break;
                    default:
                        throw new RuntimeException("Type has not been added to StreamsProviderTask.");
                }
                if (streamsResultSet != null) {
                    flushResults(streamsResultSet);
                }
                LOGGER.debug("Complete Provider Task execution for {}", this.provider.getClass().getSimpleName());
                this.provider.cleanUp();
                this.started.set(true);
                this.keepRunning.set(false);
            } catch (Exception e2) {
                LOGGER.error("Error in processing provider stream", e2);
                LOGGER.debug("Complete Provider Task execution for {}", this.provider.getClass().getSimpleName());
                this.provider.cleanUp();
                this.started.set(true);
                this.keepRunning.set(false);
            }
        } catch (Throwable th) {
            LOGGER.debug("Complete Provider Task execution for {}", this.provider.getClass().getSimpleName());
            this.provider.cleanUp();
            this.started.set(true);
            this.keepRunning.set(false);
            throw th;
        }
    }

    @Override // org.apache.streams.local.tasks.StreamsTask
    public boolean isRunning() {
        return !this.started.get() || this.flushing.get() || (this.provider.isRunning() && this.keepRunning.get());
    }

    public void flushResults(StreamsResultSet streamsResultSet) {
        Queue queue = streamsResultSet.getQueue();
        this.flushing.set(true);
        while (!queue.isEmpty()) {
            StreamsDatum streamsDatum = (StreamsDatum) queue.poll();
            if (!this.keepRunning.get()) {
                break;
            }
            if (streamsDatum != null) {
                try {
                    super.addToOutgoingQueue(streamsDatum);
                    this.counter.incrementEmittedCount();
                    this.statusCounter.incrementStatus(DatumStatus.SUCCESS);
                } catch (Exception e) {
                    this.counter.incrementErrorCount();
                    this.statusCounter.incrementStatus(DatumStatus.FAIL);
                    DatumUtils.addErrorToMetadata(streamsDatum, e, this.provider.getClass());
                }
            }
        }
        this.flushing.set(false);
    }

    @Override // org.apache.streams.local.tasks.StreamsTask
    public void setStreamsTaskCounter(StreamsTaskCounter streamsTaskCounter) {
        this.counter = streamsTaskCounter;
    }
}
