package org.apache.streams.datasift.provider;

import com.datasift.client.DataSiftClient;
import com.datasift.client.DataSiftConfig;
import com.datasift.client.core.Stream;
import com.datasift.client.stream.DeletedInteraction;
import com.datasift.client.stream.Interaction;
import com.datasift.client.stream.StreamEventListener;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import java.math.BigInteger;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.datasift.DatasiftConfiguration;
import org.apache.streams.datasift.util.StreamsDatasiftMapper;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/datasift/provider/DatasiftStreamProvider.class */
public class DatasiftStreamProvider implements StreamsProvider {
    private static final String STREAMS_ID = "DatasiftStreamProvider";
    private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftStreamProvider.class);
    private DatasiftConfiguration config;
    private ConcurrentLinkedQueue<Interaction> interactions;
    private Map<String, DataSiftClient> clients;
    private StreamEventListener eventListener;
    private ObjectMapper mapper;

    /* loaded from: input_file:org/apache/streams/datasift/provider/DatasiftStreamProvider$DeleteHandler.class */
    public static class DeleteHandler extends StreamEventListener {
        public void onDelete(DeletedInteraction deletedInteraction) {
            DatasiftStreamProvider.LOGGER.info("DELETED:\n " + deletedInteraction);
        }
    }

    public DatasiftStreamProvider(StreamEventListener streamEventListener) {
        this(streamEventListener, null);
    }

    public Queue<Interaction> getInteractions() {
        return this.interactions;
    }

    public DatasiftStreamProvider(StreamEventListener streamEventListener, DatasiftConfiguration datasiftConfiguration) {
        this.interactions = new ConcurrentLinkedQueue<>();
        if (datasiftConfiguration == null) {
            this.config = DatasiftStreamConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("datasift"));
        } else {
            this.config = datasiftConfiguration;
        }
        this.eventListener = streamEventListener;
    }

    public String getId() {
        return STREAMS_ID;
    }

    public void startStream() {
        Preconditions.checkNotNull(this.config);
        Preconditions.checkNotNull(this.config.getStreamHash());
        Preconditions.checkNotNull(this.config.getStreamHash().get(0));
        Preconditions.checkNotNull(this.config.getApiKey());
        Preconditions.checkNotNull(this.config.getUserName());
        Preconditions.checkNotNull(this.clients);
        Iterator<String> it = this.config.getStreamHash().iterator();
        while (it.hasNext()) {
            startStreamForHash(it.next());
        }
    }

    public void startStreamForHash(String str) {
        shutDownStream(str);
        DataSiftClient newClient = getNewClient(this.config.getUserName(), this.config.getApiKey());
        newClient.liveStream().onStreamEvent(this.eventListener);
        newClient.liveStream().onError(new ErrorHandler(this, str));
        newClient.liveStream().subscribe(new Subscription(Stream.fromString(str), this.interactions));
        synchronized (this.clients) {
            this.clients.put(str, newClient);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DataSiftClient getNewClient(String str, String str2) {
        return new DataSiftClient(new DataSiftConfig(str, str2));
    }

    public void shutDownStream(String str) {
        synchronized (this.clients) {
            if (this.clients.containsKey(str)) {
                DataSiftClient dataSiftClient = this.clients.get(str);
                LOGGER.debug("Shutting down stream for hash: {}", str);
                dataSiftClient.shutdown();
                this.clients.remove(dataSiftClient);
            }
        }
    }

    public void stop() {
        synchronized (this.clients) {
            Iterator<DataSiftClient> it = this.clients.values().iterator();
            while (it.hasNext()) {
                it.next().shutdown();
            }
        }
    }

    public StreamsResultSet readCurrent() {
        ConcurrentLinkedQueue newConcurrentLinkedQueue = Queues.newConcurrentLinkedQueue();
        StreamsDatum streamsDatum = null;
        while (!this.interactions.isEmpty()) {
            Interaction poll = this.interactions.poll();
            try {
                streamsDatum = new StreamsDatum(this.mapper.writeValueAsString(poll.getData()), poll.getData().get("interaction").get("id").textValue());
            } catch (JsonProcessingException e) {
                LOGGER.error("Exception while converting Interaction to String : {}", e);
            }
            if (streamsDatum != null) {
                while (!newConcurrentLinkedQueue.offer(streamsDatum)) {
                    Thread.yield();
                }
            }
        }
        return new StreamsResultSet(newConcurrentLinkedQueue);
    }

    public StreamsResultSet readNew(BigInteger bigInteger) {
        return null;
    }

    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
        return null;
    }

    public boolean isRunning() {
        return this.clients != null && this.clients.size() > 0;
    }

    public void prepare(Object obj) {
        this.interactions = new ConcurrentLinkedQueue<>();
        this.clients = Maps.newHashMap();
        this.mapper = StreamsDatasiftMapper.getInstance();
    }

    public void cleanUp() {
        stop();
    }

    public DatasiftConfiguration getConfig() {
        return this.config;
    }

    public void setConfig(DatasiftConfiguration datasiftConfiguration) {
        this.config = datasiftConfiguration;
    }
}
