package org.apache.kudu.flume.sink;

import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.UncheckedExecutionException;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.annotations.InterfaceAudience;
import org.apache.kudu.annotations.InterfaceStability;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.PartialRow;

@InterfaceStability.Evolving
@InterfaceAudience.Public
/* loaded from: input_file:org/apache/kudu/flume/sink/AvroKuduOperationsProducer.class */
public class AvroKuduOperationsProducer implements KuduOperationsProducer {
    public static final String OPERATION_PROP = "operation";
    public static final String SCHEMA_PROP = "schemaPath";
    public static final String DEFAULT_OPERATION = "upsert";
    public static final String SCHEMA_URL_HEADER = "flume.avro.schema.url";
    public static final String SCHEMA_LITERAL_HEADER = "flume.avro.schema.literal";
    private String operation;
    private GenericRecord reuse;
    private KuduTable table;
    private String defaultSchemaUrl;
    private BinaryDecoder decoder = null;
    private static final LoadingCache<String, Schema> schemasFromURL = CacheBuilder.newBuilder().build(new CacheLoader<String, Schema>() { // from class: org.apache.kudu.flume.sink.AvroKuduOperationsProducer.1
        public Schema load(String str) throws IOException {
            Schema.Parser parser = new Schema.Parser();
            FSDataInputStream fSDataInputStream = null;
            try {
                fSDataInputStream = str.toLowerCase(Locale.ENGLISH).startsWith("hdfs:/") ? FileSystem.get(URI.create(str), AvroKuduOperationsProducer.conf).open(new Path(str)) : new URL(str).openStream();
                Schema parse = parser.parse(fSDataInputStream);
                if (fSDataInputStream != null) {
                    fSDataInputStream.close();
                }
                return parse;
            } catch (Throwable th) {
                if (fSDataInputStream != null) {
                    fSDataInputStream.close();
                }
                throw th;
            }
        }
    });
    private static final LoadingCache<String, Schema> schemasFromLiteral = CacheBuilder.newBuilder().build(new CacheLoader<String, Schema>() { // from class: org.apache.kudu.flume.sink.AvroKuduOperationsProducer.2
        public Schema load(String str) {
            Preconditions.checkNotNull(str, "Schema literal cannot be null without a Schema URL");
            return new Schema.Parser().parse(str);
        }
    });
    private static final LoadingCache<Schema, DatumReader<GenericRecord>> readers = CacheBuilder.newBuilder().build(new CacheLoader<Schema, DatumReader<GenericRecord>>() { // from class: org.apache.kudu.flume.sink.AvroKuduOperationsProducer.3
        public DatumReader<GenericRecord> load(Schema schema) {
            return new GenericDatumReader(schema);
        }
    });
    private static final Configuration conf = new Configuration();

    public void configure(Context context) {
        this.operation = context.getString("operation", "upsert");
        String string = context.getString(SCHEMA_PROP);
        if (string != null) {
            this.defaultSchemaUrl = string;
        }
    }

    @Override // org.apache.kudu.flume.sink.KuduOperationsProducer
    public void initialize(KuduTable kuduTable) {
        this.table = kuduTable;
    }

    @Override // org.apache.kudu.flume.sink.KuduOperationsProducer
    public List<Operation> getOperations(Event event) throws FlumeException {
        Operation newInsert;
        DatumReader datumReader = (DatumReader) readers.getUnchecked(getSchema(event));
        this.decoder = DecoderFactory.get().binaryDecoder(event.getBody(), this.decoder);
        try {
            this.reuse = (GenericRecord) datumReader.read(this.reuse, this.decoder);
            String lowerCase = this.operation.toLowerCase();
            boolean z = -1;
            switch (lowerCase.hashCode()) {
                case -1183792455:
                    if (lowerCase.equals("insert")) {
                        z = true;
                        break;
                    }
                    break;
                case -838395601:
                    if (lowerCase.equals("upsert")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    newInsert = this.table.newUpsert();
                    break;
                case true:
                    newInsert = this.table.newInsert();
                    break;
                default:
                    throw new FlumeException(String.format("Unexpected operation %s", this.operation));
            }
            setupOp(newInsert, this.reuse);
            return Collections.singletonList(newInsert);
        } catch (IOException e) {
            throw new FlumeException("Cannot deserialize event", e);
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:8:0x005f. Please report as an issue. */
    private void setupOp(Operation operation, GenericRecord genericRecord) {
        PartialRow row = operation.getRow();
        for (ColumnSchema columnSchema : this.table.getSchema().getColumns()) {
            String name = columnSchema.getName();
            Object obj = genericRecord.get(name);
            if (obj != null) {
                try {
                    switch (columnSchema.getType()) {
                        case BOOL:
                            row.addBoolean(name, ((Boolean) obj).booleanValue());
                            break;
                        case INT8:
                            row.addByte(name, ((Byte) obj).byteValue());
                            break;
                        case INT16:
                            row.addShort(name, ((Short) obj).shortValue());
                            break;
                        case INT32:
                            row.addInt(name, ((Integer) obj).intValue());
                            break;
                        case INT64:
                        case UNIXTIME_MICROS:
                            row.addLong(name, ((Long) obj).longValue());
                            break;
                        case FLOAT:
                            row.addFloat(name, ((Float) obj).floatValue());
                            break;
                        case DOUBLE:
                            row.addDouble(name, ((Double) obj).doubleValue());
                            break;
                        case STRING:
                            row.addString(name, obj.toString());
                            break;
                        case BINARY:
                            row.addBinary(name, (byte[]) obj);
                            break;
                        default:
                            throw new FlumeException(String.format("Unrecognized type %s for column %s", columnSchema.getType().toString(), name));
                    }
                } catch (ClassCastException e) {
                    throw new FlumeException(String.format("Failed to coerce value for column '%s' to type %s", columnSchema.getName(), columnSchema.getType()));
                }
            } else if (columnSchema.isNullable()) {
                row.setNull(name);
            }
        }
    }

    private Schema getSchema(Event event) throws FlumeException {
        Map headers = event.getHeaders();
        String str = (String) headers.get(SCHEMA_URL_HEADER);
        String str2 = (String) headers.get(SCHEMA_LITERAL_HEADER);
        try {
            if (str != null) {
                return (Schema) schemasFromURL.get(str);
            }
            if (str2 != null) {
                return (Schema) schemasFromLiteral.get(str2);
            }
            if (this.defaultSchemaUrl != null) {
                return (Schema) schemasFromURL.get(this.defaultSchemaUrl);
            }
            throw new FlumeException(String.format("No schema for event. Specify configuration property '%s' or event header '%s'", SCHEMA_PROP, SCHEMA_URL_HEADER));
        } catch (UncheckedExecutionException e) {
            throw new FlumeException("Cannot parse schema", e);
        } catch (ExecutionException e2) {
            throw new FlumeException("Cannot get schema", e2);
        }
    }

    @Override // org.apache.kudu.flume.sink.KuduOperationsProducer, java.lang.AutoCloseable
    public void close() {
    }
}
