package org.apache.kudu.flume.sink;

import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.BaseKuduTest;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduTable;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kudu/flume/sink/KuduSinkTest.class */
public class KuduSinkTest extends BaseKuduTest {
    private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTest.class);

    private KuduTable createNewTable(String str) throws Exception {
        LOG.info("Creating new table...");
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(true).build());
        KuduTable createTable = createTable(str, new Schema(arrayList), new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload")).setNumReplicas(1));
        LOG.info("Created new table.");
        return createTable;
    }

    @Test
    public void testMandatoryParameters() {
        LOG.info("Testing mandatory parameters...");
        KuduSink kuduSink = new KuduSink(syncClient);
        HashMap hashMap = new HashMap();
        try {
            Configurables.configure(kuduSink, new Context(hashMap));
            Assert.fail("Should have failed due to missing properties");
        } catch (NullPointerException e) {
        }
        hashMap.put("tableName", "tableName");
        try {
            Configurables.configure(kuduSink, new Context(hashMap));
            Assert.fail("Should have failed due to missing properties");
        } catch (NullPointerException e2) {
        }
        LOG.info("Testing mandatory parameters finished successfully.");
    }

    @Test(expected = FlumeException.class)
    public void testMissingTable() throws Exception {
        LOG.info("Testing missing table...");
        KuduSink createSink = createSink("missingTable");
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, new Context());
        createSink.setChannel(memoryChannel);
        createSink.start();
        LOG.info("Testing missing table finished successfully.");
    }

    @Test
    public void testEmptyChannelWithDefaults() throws Exception {
        testEventsWithDefaults(0);
    }

    @Test
    public void testOneEventWithDefaults() throws Exception {
        testEventsWithDefaults(1);
    }

    @Test
    public void testThreeEventsWithDefaults() throws Exception {
        testEventsWithDefaults(3);
    }

    @Test
    public void testDuplicateRowsWithDuplicatesIgnored() throws Exception {
        doTestDuplicateRows(true);
    }

    @Test
    public void testDuplicateRowsWithDuplicatesNotIgnored() throws Exception {
        doTestDuplicateRows(false);
    }

    private void doTestDuplicateRows(boolean z) throws Exception {
        String name = createNewTable("testDuplicateRows" + z).getName();
        Context context = new Context();
        context.put("ignoreDuplicateRows", Boolean.toString(z));
        KuduSink createSink = createSink(name, context);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, new Context());
        createSink.setChannel(memoryChannel);
        createSink.start();
        Transaction transaction = memoryChannel.getTransaction();
        transaction.begin();
        for (int i = 0; i < 2; i++) {
            memoryChannel.put(EventBuilder.withBody("key-0", Charsets.UTF_8));
        }
        transaction.commit();
        transaction.close();
        try {
            Sink.Status process = createSink.process();
            if (!z) {
                Assert.fail("Incorrectly ignored duplicate rows!");
            }
            Assert.assertTrue("incorrect status for empty channel", process == Sink.Status.READY);
            try {
                Assert.assertEquals("1 row expected", 1L, scanTableToStrings(r0, new KuduPredicate[0]).size());
                LOG.info("Testing duplicate events finished successfully.");
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (EventDeliveryException e2) {
            if (z) {
                throw new AssertionError("Failed to ignore duplicate rows!", e2);
            }
            LOG.info("Correctly did not ignore duplicate rows", e2);
        }
    }

    private void testEventsWithDefaults(int i) throws Exception {
        LOG.info("Testing {} events...", Integer.valueOf(i));
        KuduTable createNewTable = createNewTable("test" + i + "events");
        KuduSink createSink = createSink(createNewTable.getName());
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, new Context());
        createSink.setChannel(memoryChannel);
        createSink.start();
        Transaction transaction = memoryChannel.getTransaction();
        transaction.begin();
        for (int i2 = 0; i2 < i; i2++) {
            memoryChannel.put(EventBuilder.withBody(String.format("payload body %s", Integer.valueOf(i2)).getBytes()));
        }
        transaction.commit();
        transaction.close();
        Sink.Status process = createSink.process();
        if (i == 0) {
            Assert.assertTrue("incorrect status for empty channel", process == Sink.Status.BACKOFF);
        } else {
            Assert.assertTrue("incorrect status for non-empty channel", process != Sink.Status.BACKOFF);
        }
        List scanTableToStrings = scanTableToStrings(createNewTable, new KuduPredicate[0]);
        Assert.assertEquals(i + " row(s) expected", i, scanTableToStrings.size());
        for (int i3 = 0; i3 < i; i3++) {
            Assert.assertTrue("incorrect payload", ((String) scanTableToStrings.get(i3)).contains("payload body " + i3));
        }
        LOG.info("Testing {} events finished successfully.", Integer.valueOf(i));
    }

    private KuduSink createSink(String str) {
        return createSink(str, new Context());
    }

    private KuduSink createSink(String str, Context context) {
        LOG.info("Creating Kudu sink for '{}' table...", str);
        KuduSink kuduSink = new KuduSink(syncClient);
        HashMap hashMap = new HashMap();
        hashMap.put("tableName", str);
        hashMap.put("masterAddresses", getMasterAddresses());
        Context context2 = new Context(hashMap);
        context2.putAll(context.getParameters());
        Configurables.configure(kuduSink, context2);
        LOG.info("Created Kudu sink for '{}' table.", str);
        return kuduSink;
    }
}
