package org.apache.streams.local.tasks;

import com.google.common.util.concurrent.Uninterruptibles;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.DatumStatusCountable;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.core.util.DatumUtils;
import org.apache.streams.local.counters.StreamsTaskCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/local/tasks/StreamsPersistWriterTask.class */
public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumStatusCountable {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamsPersistWriterTask.class);
    private StreamsPersistWriter writer;
    private AtomicBoolean keepRunning;
    private StreamsConfiguration streamConfig;
    private BlockingQueue<StreamsDatum> inQueue;
    private AtomicBoolean isRunning;
    private AtomicBoolean blocked;
    private StreamsTaskCounter counter;
    private DatumStatusCounter statusCounter;

    public DatumStatusCounter getDatumStatusCounter() {
        return this.statusCounter;
    }

    public StreamsPersistWriterTask(StreamsPersistWriter streamsPersistWriter) {
        this(streamsPersistWriter, null);
    }

    public StreamsPersistWriterTask(StreamsPersistWriter streamsPersistWriter, StreamsConfiguration streamsConfiguration) {
        super(streamsConfiguration);
        this.statusCounter = new DatumStatusCounter();
        this.streamConfig = super.streamConfig;
        this.writer = streamsPersistWriter;
        this.keepRunning = new AtomicBoolean(true);
        this.isRunning = new AtomicBoolean(true);
        this.blocked = new AtomicBoolean(false);
    }

    @Override // org.apache.streams.local.tasks.BaseStreamsTask, org.apache.streams.local.tasks.StreamsTask
    public boolean isWaiting() {
        return this.inQueue.isEmpty() && this.blocked.get();
    }

    @Override // org.apache.streams.local.tasks.StreamsTask
    public void setStreamConfig(StreamsConfiguration streamsConfiguration) {
        this.streamConfig = streamsConfiguration;
    }

    @Override // org.apache.streams.local.tasks.BaseStreamsTask, org.apache.streams.local.tasks.StreamsTask
    public void addInputQueue(BlockingQueue<StreamsDatum> blockingQueue) {
        this.inQueue = blockingQueue;
    }

    @Override // org.apache.streams.local.tasks.StreamsTask
    public boolean isRunning() {
        return this.isRunning.get();
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                this.writer.prepare(this.streamConfig);
                if (this.counter == null) {
                    this.counter = new StreamsTaskCounter(this.writer.getClass().getName() + UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
                }
                while (this.keepRunning.get()) {
                    StreamsDatum streamsDatum = null;
                    try {
                        try {
                            this.blocked.set(true);
                            streamsDatum = this.inQueue.poll(5L, TimeUnit.SECONDS);
                            this.blocked.set(false);
                        } catch (InterruptedException e) {
                            LOGGER.debug("Received InterruptedException. Shutting down and re-applying interrupt status.");
                            this.keepRunning.set(false);
                            if (!this.inQueue.isEmpty()) {
                                LOGGER.error("Received InteruptedException and input queue still has data, count={}, processor={}", Integer.valueOf(this.inQueue.size()), this.writer.getClass().getName());
                            }
                            Thread.currentThread().interrupt();
                            this.blocked.set(false);
                        }
                        if (streamsDatum != null) {
                            this.counter.incrementReceivedCount();
                            try {
                                long currentTimeMillis = System.currentTimeMillis();
                                this.writer.write(streamsDatum);
                                this.counter.addTime(System.currentTimeMillis() - currentTimeMillis);
                                this.statusCounter.incrementStatus(DatumStatus.SUCCESS);
                            } catch (Exception e2) {
                                LOGGER.error("Error writing to persist writer {}", this.writer.getClass().getSimpleName(), e2);
                                this.keepRunning.set(false);
                                this.statusCounter.incrementStatus(DatumStatus.FAIL);
                                DatumUtils.addErrorToMetadata(streamsDatum, e2, this.writer.getClass());
                                this.counter.incrementErrorCount();
                            }
                        } else {
                            LOGGER.trace("Received null StreamsDatum @ writer : {}", this.writer.getClass().getName());
                        }
                    } catch (Throwable th) {
                        this.blocked.set(false);
                        throw th;
                    }
                }
                Uninterruptibles.sleepUninterruptibly(this.streamConfig.getBatchFrequencyMs().longValue(), TimeUnit.MILLISECONDS);
                Uninterruptibles.sleepUninterruptibly(this.streamConfig.getBatchFrequencyMs().longValue(), TimeUnit.MILLISECONDS);
                this.writer.cleanUp();
                this.isRunning.set(false);
            } catch (Throwable th2) {
                LOGGER.error("Caught Throwable in Persist Writer {} : {}", this.writer.getClass().getSimpleName(), th2);
                Uninterruptibles.sleepUninterruptibly(this.streamConfig.getBatchFrequencyMs().longValue(), TimeUnit.MILLISECONDS);
                this.writer.cleanUp();
                this.isRunning.set(false);
            }
        } catch (Throwable th3) {
            Uninterruptibles.sleepUninterruptibly(this.streamConfig.getBatchFrequencyMs().longValue(), TimeUnit.MILLISECONDS);
            this.writer.cleanUp();
            this.isRunning.set(false);
            throw th3;
        }
    }

    @Override // org.apache.streams.local.tasks.StreamsTask
    public void stopTask() {
        this.keepRunning.set(false);
    }

    @Override // org.apache.streams.local.tasks.BaseStreamsTask, org.apache.streams.local.tasks.StreamsTask
    public void addOutputQueue(BlockingQueue<StreamsDatum> blockingQueue) {
        throw new UnsupportedOperationException(getClass().getName() + " does not support method - setOutputQueue()");
    }

    @Override // org.apache.streams.local.tasks.BaseStreamsTask, org.apache.streams.local.tasks.StreamsTask
    public List<BlockingQueue<StreamsDatum>> getInputQueues() {
        LinkedList linkedList = new LinkedList();
        linkedList.add(this.inQueue);
        return linkedList;
    }

    @Override // org.apache.streams.local.tasks.StreamsTask
    public void setStreamsTaskCounter(StreamsTaskCounter streamsTaskCounter) {
        this.counter = streamsTaskCounter;
    }
}
