package org.apache.kudu.flume.sink;

import java.util.Iterator;
import java.util.List;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.kudu.client.KuduClient;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kudu/flume/sink/KuduSinkTestUtil.class */
class KuduSinkTestUtil {
    private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTestUtil.class);

    KuduSinkTestUtil() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static KuduSink createSink(KuduClient kuduClient, String str, Context context) {
        return createSink(str, kuduClient, context, kuduClient.getMasterAddressesAsString());
    }

    private static KuduSink createSink(String str, KuduClient kuduClient, Context context, String str2) {
        LOG.info("Creating Kudu sink for '{}' table...", str);
        Context context2 = new Context();
        context2.put("tableName", str);
        context2.put("masterAddresses", str2);
        context2.putAll(context.getParameters());
        KuduSink kuduSink = new KuduSink(kuduClient);
        Configurables.configure(kuduSink, context2);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, new Context());
        kuduSink.setChannel(memoryChannel);
        LOG.info("Created Kudu sink for '{}' table.", str);
        return kuduSink;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static KuduSink createSecureSink(String str, String str2, String str3) {
        Context context = new Context();
        context.put("kerberosKeytab", str3 + "/krb5kdc/test-user.keytab");
        context.put("kerberosPrincipal", "test-user@KRBTEST.COM");
        return createSink(str, null, context, str2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void processEventsCreatingSink(KuduClient kuduClient, Context context, String str, List<Event> list) throws EventDeliveryException {
        KuduSink createSink = createSink(kuduClient, str, context);
        createSink.start();
        processEvents(createSink, list);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void processEvents(KuduSink kuduSink, List<Event> list) throws EventDeliveryException {
        Channel channel = kuduSink.getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        Iterator<Event> it = list.iterator();
        while (it.hasNext()) {
            channel.put(it.next());
        }
        transaction.commit();
        transaction.close();
        Sink.Status process = kuduSink.process();
        if (list.isEmpty()) {
            Assert.assertSame("incorrect status for empty channel", process, Sink.Status.BACKOFF);
        } else {
            Assert.assertNotSame("incorrect status for non-empty channel", process, Sink.Status.BACKOFF);
        }
    }
}
