package org.apache.kudu.flume.sink;

import com.google.common.collect.ImmutableList;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.event.EventBuilder;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.MiniKuduCluster;
import org.apache.kudu.test.KuduTestHarness;
import org.apache.kudu.util.ClientTestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kudu/flume/sink/SecureKuduSinkTest.class */
public class SecureKuduSinkTest {
    private static final int TICKET_LIFETIME_SECONDS = 10;
    private static final int RENEWABLE_LIFETIME_SECONDS = 30;

    @Rule
    public KuduTestHarness harness = new KuduTestHarness(clusterBuilder);
    private static final Logger LOG = LoggerFactory.getLogger(SecureKuduSinkTest.class);
    private static final MiniKuduCluster.MiniKuduClusterBuilder clusterBuilder = KuduTestHarness.getBaseClusterBuilder().kdcTicketLifetime("10s").kdcRenewLifetime("30s").enableKerberos();

    @Before
    public void clearTicketCacheProperty() {
        System.clearProperty("kudu.krb5ccname");
    }

    @Test
    public void testEventsWithShortTickets() throws Exception {
        LOG.info("Creating new table...");
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(true).build());
        KuduTable createTable = this.harness.getClient().createTable("test_long_lived_events", new Schema(arrayList), new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload")).setNumReplicas(1));
        LOG.info("Created new table.");
        KuduSink createSecureSink = KuduSinkTestUtil.createSecureSink("test_long_lived_events", this.harness.getMasterAddressesAsString(), this.harness.getClusterRoot());
        createSecureSink.start();
        LOG.info("Testing events at the beginning.");
        processEvents(createSecureSink, 0, TICKET_LIFETIME_SECONDS / 2);
        LOG.info("Waiting for tickets to expire");
        TimeUnit.SECONDS.sleep(60L);
        LOG.info("Testing events after ticket renewal.");
        processEvents(createSecureSink, TICKET_LIFETIME_SECONDS / 2, TICKET_LIFETIME_SECONDS);
        List scanTableToStrings = ClientTestUtil.scanTableToStrings(createTable, new KuduPredicate[0]);
        Assert.assertEquals(TICKET_LIFETIME_SECONDS + " row(s) expected", TICKET_LIFETIME_SECONDS, scanTableToStrings.size());
        for (int i = 0; i < TICKET_LIFETIME_SECONDS; i++) {
            Assert.assertTrue("incorrect payload", ((String) scanTableToStrings.get(i)).contains("payload body " + i));
        }
        LOG.info("Testing {} events finished successfully.", Integer.valueOf(TICKET_LIFETIME_SECONDS));
    }

    private void processEvents(KuduSink kuduSink, int i, int i2) throws EventDeliveryException {
        ArrayList arrayList = new ArrayList();
        for (int i3 = i; i3 < i2; i3++) {
            arrayList.add(EventBuilder.withBody(String.format("payload body %s", Integer.valueOf(i3)).getBytes(StandardCharsets.UTF_8)));
        }
        KuduSinkTestUtil.processEvents(kuduSink, arrayList);
        LOG.info("Events flushed.");
    }
}
