package org.apache.kudu.flume.sink;

import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.BaseKuduTest;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.util.DecimalUtil;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.class */
public class AvroKuduOperationsProducerTest extends BaseKuduTest {
    private static final String schemaPath = "src/test/avro/testAvroKuduOperationsProducer.avsc";
    private static String schemaLiteral;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest$SchemaLocation.class */
    public enum SchemaLocation {
        GLOBAL,
        URL,
        LITERAL
    }

    @BeforeClass
    public static void setupAvroSchemaBeforeClass() {
        try {
            schemaLiteral = Files.toString(new File(schemaPath), Charsets.UTF_8);
        } catch (IOException e) {
            throw new FlumeException("Unable to read schema file!", e);
        }
    }

    @Test
    public void testEmptyChannel() throws Exception {
        testEvents(0, SchemaLocation.GLOBAL);
    }

    @Test
    public void testOneEvent() throws Exception {
        testEvents(1, SchemaLocation.GLOBAL);
    }

    @Test
    public void testThreeEvents() throws Exception {
        testEvents(3, SchemaLocation.GLOBAL);
    }

    @Test
    public void testThreeEventsSchemaURLInEvent() throws Exception {
        testEvents(3, SchemaLocation.URL);
    }

    @Test
    public void testThreeEventsSchemaLiteralInEvent() throws Exception {
        testEvents(3, SchemaLocation.LITERAL);
    }

    private void testEvents(int i, SchemaLocation schemaLocation) throws Exception {
        KuduTable createNewTable = createNewTable(String.format("test%sevents%s", Integer.valueOf(i), schemaLocation));
        KuduSink createSink = createSink(createNewTable.getName(), schemaLocation != SchemaLocation.GLOBAL ? new Context() : new Context(ImmutableMap.of("producer.schemaPath", new File(schemaPath).getAbsoluteFile().toURI().toString())));
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, new Context());
        createSink.setChannel(memoryChannel);
        createSink.start();
        Transaction transaction = memoryChannel.getTransaction();
        transaction.begin();
        writeEventsToChannel(memoryChannel, i, schemaLocation);
        transaction.commit();
        transaction.close();
        Sink.Status process = createSink.process();
        if (i == 0) {
            Assert.assertEquals("incorrect status for empty channel", process, Sink.Status.BACKOFF);
        } else {
            Assert.assertEquals("incorrect status for non-empty channel", process, Sink.Status.READY);
        }
        List<String> makeAnswers = makeAnswers(i);
        List scanTableToStrings = scanTableToStrings(createNewTable, new KuduPredicate[0]);
        Assert.assertEquals("wrong number of rows inserted", makeAnswers.size(), scanTableToStrings.size());
        Assert.assertArrayEquals("wrong rows inserted", makeAnswers.toArray(), scanTableToStrings.toArray());
    }

    private KuduTable createNewTable(String str) throws Exception {
        ArrayList arrayList = new ArrayList(5);
        arrayList.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
        arrayList.add(new ColumnSchema.ColumnSchemaBuilder("longField", Type.INT64).build());
        arrayList.add(new ColumnSchema.ColumnSchemaBuilder("doubleField", Type.DOUBLE).build());
        arrayList.add(new ColumnSchema.ColumnSchemaBuilder("nullableField", Type.STRING).nullable(true).build());
        arrayList.add(new ColumnSchema.ColumnSchemaBuilder("stringField", Type.STRING).build());
        arrayList.add(new ColumnSchema.ColumnSchemaBuilder("decimalField", Type.DECIMAL).typeAttributes(DecimalUtil.typeAttributes(9, 1)).build());
        return createTable(str, new Schema(arrayList), new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key")).setNumReplicas(1));
    }

    private KuduSink createSink(String str, Context context) {
        KuduSink kuduSink = new KuduSink(syncClient);
        HashMap hashMap = new HashMap();
        hashMap.put("tableName", str);
        hashMap.put("masterAddresses", getMasterAddresses());
        hashMap.put("producer", AvroKuduOperationsProducer.class.getName());
        Context context2 = new Context(hashMap);
        context2.putAll(context.getParameters());
        Configurables.configure(kuduSink, context2);
        return kuduSink;
    }

    private void writeEventsToChannel(Channel channel, int i, SchemaLocation schemaLocation) throws Exception {
        for (int i2 = 0; i2 < i; i2++) {
            AvroKuduOperationsProducerTestRecord avroKuduOperationsProducerTestRecord = new AvroKuduOperationsProducerTestRecord();
            avroKuduOperationsProducerTestRecord.setKey(Integer.valueOf(10 * i2));
            avroKuduOperationsProducerTestRecord.setLongField(Long.valueOf(2 * i2));
            avroKuduOperationsProducerTestRecord.setDoubleField(Double.valueOf(2.71828d * i2));
            avroKuduOperationsProducerTestRecord.setNullableField(i2 % 2 == 0 ? null : "taco");
            avroKuduOperationsProducerTestRecord.setStringField(String.format("hello %d", Integer.valueOf(i2)));
            avroKuduOperationsProducerTestRecord.setDecimalField(BigDecimal.valueOf(i2, 1));
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, (BinaryEncoder) null);
            new SpecificDatumWriter(AvroKuduOperationsProducerTestRecord.class).write(avroKuduOperationsProducerTestRecord, binaryEncoder);
            binaryEncoder.flush();
            Event withBody = EventBuilder.withBody(byteArrayOutputStream.toByteArray());
            if (schemaLocation == SchemaLocation.URL) {
                withBody.setHeaders(ImmutableMap.of("flume.avro.schema.url", new File(schemaPath).getAbsoluteFile().toURI().toString()));
            } else if (schemaLocation == SchemaLocation.LITERAL) {
                withBody.setHeaders(ImmutableMap.of("flume.avro.schema.literal", schemaLiteral));
            }
            channel.put(withBody);
        }
    }

    private List<String> makeAnswers(int i) {
        ArrayList newArrayList = Lists.newArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            Object[] objArr = new Object[6];
            objArr[0] = Integer.valueOf(10 * i2);
            objArr[1] = Integer.valueOf(2 * i2);
            objArr[2] = Double.valueOf(2.71828d * i2);
            objArr[3] = i2 % 2 == 0 ? "NULL" : "taco";
            objArr[4] = Integer.valueOf(i2);
            objArr[5] = BigDecimal.valueOf(i2, 1);
            newArrayList.add(String.format("INT32 key=%s, INT64 longField=%s, DOUBLE doubleField=%s, STRING nullableField=%s, STRING stringField=hello %s, DECIMAL decimalField(9, 1)=%s", objArr));
        }
        Collections.sort(newArrayList);
        return newArrayList;
    }
}
