package org.apache.streams.riak.binary;

import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.api.commands.kv.ListKeys;
import com.basho.riak.client.api.commands.kv.MultiFetch;
import com.basho.riak.client.core.RiakFuture;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import com.google.common.collect.Queues;
import java.math.BigInteger;
import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistReader;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.riak.pojo.RiakConfiguration;
import org.joda.time.DateTime;

/* loaded from: input_file:org/apache/streams/riak/binary/RiakBinaryPersistReader.class */
public class RiakBinaryPersistReader implements StreamsPersistReader {
    private RiakConfiguration configuration;
    public RiakBinaryClient client;

    public RiakBinaryPersistReader(RiakConfiguration riakConfiguration) {
        this.configuration = riakConfiguration;
    }

    public String getId() {
        return "RiakBinaryPersistReader";
    }

    public void prepare(Object obj) {
        this.client = RiakBinaryClient.getInstance(this.configuration);
    }

    public void cleanUp() {
        this.client = null;
    }

    public synchronized StreamsResultSet readAll() {
        Queue<StreamsDatum> constructQueue = constructQueue();
        try {
            try {
                Iterator it = ((MultiFetch.Response) this.client.client().execute(new MultiFetch.Builder().addLocations((ListKeys.Response) this.client.client().execute(new ListKeys.Builder(new Namespace(this.configuration.getDefaultBucketType(), this.configuration.getDefaultBucket())).build())).build())).iterator();
                while (it.hasNext()) {
                    RiakFuture riakFuture = (RiakFuture) it.next();
                    try {
                        constructQueue.add(new StreamsDatum(((FetchValue.Response) riakFuture.get()).getValue(String.class), ((Location) riakFuture.getQueryInfo()).getKeyAsString()));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e2) {
                        e2.printStackTrace();
                    }
                }
                return new StreamsResultSet(constructQueue);
            } catch (InterruptedException e3) {
                e3.printStackTrace();
                return null;
            } catch (ExecutionException e4) {
                e4.printStackTrace();
                return null;
            }
        } catch (Exception e5) {
            e5.printStackTrace();
            return null;
        }
    }

    public void startStream() {
        throw new NotImplementedException();
    }

    public StreamsResultSet readCurrent() {
        throw new NotImplementedException();
    }

    public StreamsResultSet readNew(BigInteger bigInteger) {
        throw new NotImplementedException();
    }

    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
        throw new NotImplementedException();
    }

    public boolean isRunning() {
        return Objects.nonNull(this.client);
    }

    private Queue<StreamsDatum> constructQueue() {
        return Queues.synchronizedQueue(new LinkedBlockingQueue(10000));
    }
}
