package org.apache.kudu.flume.sink;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.BaseKuduTest;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.shaded.com.google.common.base.Charsets;
import org.apache.kudu.shaded.com.google.common.collect.ImmutableList;
import org.apache.kudu.shaded.com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.class */
public class KeyedKuduOperationsProducerTest extends BaseKuduTest {
    private static final Logger LOG = LoggerFactory.getLogger(KeyedKuduOperationsProducerTest.class);

    private KuduTable createNewTable(String str) throws Exception {
        LOG.info("Creating new table...");
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING).key(true).build());
        arrayList.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(false).build());
        KuduTable createTable = createTable(str, new Schema(arrayList), new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key")).setNumReplicas(1));
        LOG.info("Created new table.");
        return createTable;
    }

    @Test
    public void testEmptyChannelWithInsert() throws Exception {
        testEvents(0, "insert");
    }

    @Test
    public void testOneEventWithInsert() throws Exception {
        testEvents(1, "insert");
    }

    @Test
    public void testThreeEventsWithInsert() throws Exception {
        testEvents(3, "insert");
    }

    @Test
    public void testEmptyChannelWithUpsert() throws Exception {
        testEvents(0, "upsert");
    }

    @Test
    public void testOneEventWithUpsert() throws Exception {
        testEvents(1, "upsert");
    }

    @Test
    public void testThreeEventsWithUpsert() throws Exception {
        testEvents(3, "upsert");
    }

    @Test
    public void testDuplicateRowsWithUpsert() throws Exception {
        LOG.info("Testing events with upsert...");
        KuduTable createNewTable = createNewTable("testDupUpsertEvents");
        KuduSink createSink = createSink(createNewTable.getName(), new Context(ImmutableMap.of("producer.operation", "upsert")));
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, new Context());
        createSink.setChannel(memoryChannel);
        createSink.start();
        Transaction transaction = memoryChannel.getTransaction();
        transaction.begin();
        for (int i = 0; i < 3; i++) {
            Event withBody = EventBuilder.withBody(String.format("payload body %s", Integer.valueOf(i)), Charsets.UTF_8);
            withBody.setHeaders(ImmutableMap.of("key", String.format("key %s", Integer.valueOf(i))));
            memoryChannel.put(withBody);
        }
        transaction.commit();
        transaction.close();
        Assert.assertTrue("incorrect status for non-empty channel", createSink.process() != Sink.Status.BACKOFF);
        List scanTableToStrings = scanTableToStrings(createNewTable, new KuduPredicate[0]);
        Assert.assertEquals("3 row(s) expected", 3, scanTableToStrings.size());
        for (int i2 = 0; i2 < 3; i2++) {
            Assert.assertTrue("incorrect payload", ((String) scanTableToStrings.get(i2)).contains("payload body " + i2));
        }
        Transaction transaction2 = memoryChannel.getTransaction();
        transaction2.begin();
        Event withBody2 = EventBuilder.withBody("payload body upserted".getBytes());
        withBody2.setHeaders(ImmutableMap.of("key", String.format("key %s", 0)));
        memoryChannel.put(withBody2);
        transaction2.commit();
        transaction2.close();
        Assert.assertTrue("incorrect status for non-empty channel", createSink.process() != Sink.Status.BACKOFF);
        List scanTableToStrings2 = scanTableToStrings(createNewTable, new KuduPredicate[0]);
        Assert.assertEquals("3 row(s) expected", 3, scanTableToStrings2.size());
        Assert.assertTrue("incorrect payload", ((String) scanTableToStrings2.get(0)).contains("payload body upserted"));
        for (int i3 = 1; i3 < 3; i3++) {
            Assert.assertTrue("incorrect payload", ((String) scanTableToStrings2.get(i3)).contains("payload body " + i3));
        }
        LOG.info("Testing events with upsert finished successfully.");
    }

    private void testEvents(int i, String str) throws Exception {
        LOG.info("Testing {} events...", Integer.valueOf(i));
        KuduTable createNewTable = createNewTable("test" + i + "events" + str);
        KuduSink createSink = createSink(createNewTable.getName(), new Context(ImmutableMap.of("producer.operation", str)));
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, new Context());
        createSink.setChannel(memoryChannel);
        createSink.start();
        Transaction transaction = memoryChannel.getTransaction();
        transaction.begin();
        for (int i2 = 0; i2 < i; i2++) {
            Event withBody = EventBuilder.withBody(String.format("payload body %s", Integer.valueOf(i2)).getBytes());
            withBody.setHeaders(ImmutableMap.of("key", String.format("key %s", Integer.valueOf(i2))));
            memoryChannel.put(withBody);
        }
        transaction.commit();
        transaction.close();
        Sink.Status process = createSink.process();
        if (i == 0) {
            Assert.assertTrue("incorrect status for empty channel", process == Sink.Status.BACKOFF);
        } else {
            Assert.assertTrue("incorrect status for non-empty channel", process != Sink.Status.BACKOFF);
        }
        List scanTableToStrings = scanTableToStrings(createNewTable, new KuduPredicate[0]);
        Assert.assertEquals(i + " row(s) expected", i, scanTableToStrings.size());
        for (int i3 = 0; i3 < i; i3++) {
            Assert.assertTrue("incorrect payload", ((String) scanTableToStrings.get(i3)).contains("payload body " + i3));
        }
        LOG.info("Testing {} events finished successfully.", Integer.valueOf(i));
    }

    private KuduSink createSink(String str, Context context) {
        LOG.info("Creating Kudu sink for '{}' table...", str);
        KuduSink kuduSink = new KuduSink(syncClient);
        HashMap hashMap = new HashMap();
        hashMap.put("tableName", str);
        hashMap.put("masterAddresses", getMasterAddresses());
        hashMap.put("producer", SimpleKeyedKuduOperationsProducer.class.getName());
        Context context2 = new Context(hashMap);
        context2.putAll(context.getParameters());
        Configurables.configure(kuduSink, context2);
        LOG.info("Created Kudu sink for '{}' table.", str);
        return kuduSink;
    }
}
