package org.apache.streams.riak.test;

import com.basho.riak.client.api.commands.buckets.ListBuckets;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.FileInputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.riak.binary.RiakBinaryClient;
import org.apache.streams.riak.binary.RiakBinaryPersistReader;
import org.apache.streams.riak.binary.RiakBinaryPersistWriter;
import org.apache.streams.riak.pojo.RiakConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/streams/riak/test/RiakBinaryPersistIT.class */
public class RiakBinaryPersistIT {
    private static final Logger LOGGER = LoggerFactory.getLogger(RiakBinaryPersistIT.class);
    private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
    private RiakBinaryClient testClient;
    private RiakConfiguration testConfiguration;

    @BeforeClass
    public void prepareTest() throws Exception {
        this.testConfiguration = new ComponentConfigurator(RiakConfiguration.class).detectConfiguration("RiakBinaryPersistIT");
        this.testClient = RiakBinaryClient.getInstance(this.testConfiguration);
        Assert.assertTrue(this.testClient.client().getRiakCluster().getNodes().size() > 0);
        Thread.sleep(10000L);
        this.testClient.client().execute(new ListBuckets.Builder("default").build(), 30L, TimeUnit.SECONDS);
    }

    @Test
    public void testRiakBinaryPersist() throws Exception {
        RiakBinaryPersistWriter riakBinaryPersistWriter = new RiakBinaryPersistWriter(this.testConfiguration);
        riakBinaryPersistWriter.prepare(this.testConfiguration);
        int i = 0;
        for (Path path : (List) Files.list(Paths.get("target/dependency/activitystreams-testdata", new String[0])).collect(Collectors.toList())) {
            LOGGER.info("File: " + path);
            Activity activity = (Activity) MAPPER.readValue(new FileInputStream(path.toFile()), Activity.class);
            riakBinaryPersistWriter.write(new StreamsDatum(activity, activity.getVerb()));
            LOGGER.info("Wrote: " + activity.getVerb());
            i++;
        }
        riakBinaryPersistWriter.cleanUp();
        LOGGER.info("Total Written: {}", Integer.valueOf(i));
        Assert.assertEquals(i, 89);
        RiakBinaryPersistReader riakBinaryPersistReader = new RiakBinaryPersistReader(this.testConfiguration);
        riakBinaryPersistReader.prepare(this.testConfiguration);
        StreamsResultSet readAll = riakBinaryPersistReader.readAll();
        Assert.assertNotNull(readAll);
        LOGGER.info("Total Read: {}", Integer.valueOf(readAll.size()));
        Assert.assertEquals(readAll.size(), 89);
    }

    @AfterClass
    public void cleanup() throws Exception {
    }
}
