package org.apache.streams.local.tasks;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.util.SerializationUtil;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/local/tasks/BaseStreamsTask.class */
public abstract class BaseStreamsTask implements StreamsTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(BaseStreamsTask.class);
    private List<BlockingQueue<StreamsDatum>> inQueues = new ArrayList();
    private List<BlockingQueue<StreamsDatum>> outQueues = new LinkedList();
    private int inIndex = 0;
    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
    protected StreamsConfiguration streamConfig;

    public BaseStreamsTask(StreamsConfiguration streamsConfiguration) {
        this.mapper.registerSubtypes(new Class[]{Activity.class});
        if (streamsConfiguration != null) {
            this.streamConfig = streamsConfiguration;
        } else {
            this.streamConfig = StreamsConfigurator.detectConfiguration();
        }
        setStartedAt();
    }

    @Override // org.apache.streams.local.tasks.StreamsTask
    public void addInputQueue(BlockingQueue<StreamsDatum> blockingQueue) {
        this.inQueues.add(blockingQueue);
    }

    @Override // org.apache.streams.local.tasks.StreamsTask
    public void addOutputQueue(BlockingQueue<StreamsDatum> blockingQueue) {
        this.outQueues.add(blockingQueue);
    }

    @Override // org.apache.streams.local.tasks.StreamsTask
    public List<BlockingQueue<StreamsDatum>> getInputQueues() {
        return this.inQueues;
    }

    @Override // org.apache.streams.local.tasks.StreamsTask
    public List<BlockingQueue<StreamsDatum>> getOutputQueues() {
        return this.outQueues;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Deprecated
    public StreamsDatum getNextDatum() {
        StreamsDatum poll;
        int i = this.inIndex;
        int i2 = i;
        do {
            poll = this.inQueues.get(i2).poll();
            i2 = getNextInputQueueIndex();
            if (poll != null) {
                break;
            }
        } while (i != i2);
        return poll;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addToOutgoingQueue(StreamsDatum streamsDatum) throws InterruptedException {
        if (this.outQueues.size() == 1) {
            this.outQueues.get(0).put(streamsDatum);
            return;
        }
        LinkedList<BlockingQueue> linkedList = new LinkedList(this.outQueues);
        while (!linkedList.isEmpty()) {
            for (BlockingQueue blockingQueue : linkedList) {
                StreamsDatum cloneStreamsDatum = cloneStreamsDatum(streamsDatum);
                if (cloneStreamsDatum != null && blockingQueue.offer(cloneStreamsDatum, 500L, TimeUnit.MILLISECONDS)) {
                    linkedList.remove(blockingQueue);
                }
            }
        }
    }

    @Override // org.apache.streams.local.tasks.StreamsTask
    public boolean isWaiting() {
        if (this.inQueues == null || this.inQueues.size() == 0) {
            return true;
        }
        boolean z = true;
        Iterator<BlockingQueue<StreamsDatum>> it = this.inQueues.iterator();
        while (it.hasNext()) {
            z = z && it.next().isEmpty();
        }
        return z;
    }

    protected StreamsDatum cloneStreamsDatum(StreamsDatum streamsDatum) {
        try {
        } catch (Exception e) {
            LOGGER.error("Exception while trying to clone/copy StreamsDatum : {}", e);
        }
        if (streamsDatum.document instanceof ObjectNode) {
            return copyMetaData(streamsDatum, new StreamsDatum(((ObjectNode) streamsDatum.document).deepCopy(), streamsDatum.timestamp, streamsDatum.sequenceid));
        }
        if (streamsDatum.document instanceof Activity) {
            return copyMetaData(streamsDatum, new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(streamsDatum.document), Activity.class), streamsDatum.timestamp, streamsDatum.sequenceid));
        }
        if (streamsDatum.document instanceof Serializable) {
            return (StreamsDatum) SerializationUtil.cloneBySerialization(streamsDatum);
        }
        LOGGER.error("Failed to clone/copy StreamsDatum with document of class : {}", streamsDatum.document.getClass().getName());
        return null;
    }

    private int getNextInputQueueIndex() {
        this.inIndex++;
        if (this.inIndex >= this.inQueues.size()) {
            this.inIndex = 0;
        }
        return this.inIndex;
    }

    private StreamsDatum copyMetaData(StreamsDatum streamsDatum, StreamsDatum streamsDatum2) {
        Map metadata = streamsDatum.getMetadata();
        Map metadata2 = streamsDatum2.getMetadata();
        for (String str : metadata.keySet()) {
            Object obj = metadata.get(str);
            if (obj instanceof Serializable) {
                metadata2.put(str, SerializationUtil.cloneBySerialization(obj));
            } else {
                metadata2.put(str, obj);
            }
        }
        return streamsDatum2;
    }

    public long getStartedAt() {
        return this.streamConfig.getStartedAt().longValue();
    }

    public void setStartedAt() {
        this.streamConfig.setStartedAt(Long.valueOf(DateTime.now().getMillis()));
    }

    public String getStreamIdentifier() {
        return this.streamConfig.getIdentifier();
    }
}
