package co.cask.directives.parser;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.wrangler.api.Arguments;
import co.cask.wrangler.api.Directive;
import co.cask.wrangler.api.DirectiveExecutionException;
import co.cask.wrangler.api.DirectiveParseException;
import co.cask.wrangler.api.ExecutorContext;
import co.cask.wrangler.api.Row;
import co.cask.wrangler.api.annotations.Categories;
import co.cask.wrangler.api.parser.ColumnName;
import co.cask.wrangler.api.parser.Identifier;
import co.cask.wrangler.api.parser.Numeric;
import co.cask.wrangler.api.parser.Text;
import co.cask.wrangler.api.parser.TokenType;
import co.cask.wrangler.api.parser.UsageDefinition;
import co.cask.wrangler.clients.RestClientException;
import co.cask.wrangler.clients.SchemaRegistryClient;
import co.cask.wrangler.codec.Decoder;
import co.cask.wrangler.codec.DecoderException;
import co.cask.wrangler.codec.ProtobufDecoderUsingDescriptor;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.xmlbeans.impl.jam.xml.JamXmlElements;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name(ParseProtobuf.NAME)
@Plugin(type = Directive.Type)
@Categories(categories = {"parser", "protobuf"})
@Description("Parses column as protobuf encoded memory representations.")
/* loaded from: input_file:co/cask/directives/parser/ParseProtobuf.class */
public class ParseProtobuf implements Directive {
    public static final String NAME = "parse-as-protobuf";
    private static final Logger LOG = LoggerFactory.getLogger(ParseProtobuf.class);
    private String column;
    private String schemaId;
    private String recordName;
    private long version;
    private Decoder<Row> decoder;
    private boolean decoderInitialized = false;
    private SchemaRegistryClient client;

    @Override // co.cask.wrangler.api.Directive
    public UsageDefinition define() {
        UsageDefinition.Builder builder = UsageDefinition.builder(NAME);
        builder.define(JamXmlElements.COLUMN, TokenType.COLUMN_NAME);
        builder.define("schema-id", TokenType.IDENTIFIER);
        builder.define("record-name", TokenType.TEXT);
        builder.define("version", TokenType.NUMERIC, true);
        return builder.build();
    }

    @Override // co.cask.wrangler.api.Executor
    public void initialize(Arguments arguments) throws DirectiveParseException {
        this.column = ((ColumnName) arguments.value(JamXmlElements.COLUMN)).value();
        this.schemaId = ((Identifier) arguments.value("schema-id")).value();
        this.recordName = ((Text) arguments.value("record-name")).value();
        if (arguments.contains("version")) {
            this.version = ((Numeric) arguments.value("version")).value().intValue();
        } else {
            this.version = -1L;
        }
    }

    @Override // co.cask.wrangler.api.Executor
    public void destroy() {
    }

    @Override // co.cask.wrangler.api.Executor
    public List<Row> execute(List<Row> list, final ExecutorContext executorContext) throws DirectiveExecutionException {
        ArrayList arrayList = new ArrayList();
        if (!this.decoderInitialized) {
            try {
                this.decoder = (Decoder) RetryerBuilder.newBuilder().retryIfExceptionOfType(IOException.class).retryIfExceptionOfType(RestClientException.class).withWaitStrategy(WaitStrategies.exponentialWait(10L, TimeUnit.SECONDS)).withStopStrategy(StopStrategies.stopAfterAttempt(5)).build().call(new Callable<Decoder<Row>>() { // from class: co.cask.directives.parser.ParseProtobuf.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Decoder<Row> call() throws Exception {
                        ParseProtobuf.this.client = SchemaRegistryClient.getInstance(executorContext);
                        return new ProtobufDecoderUsingDescriptor(ParseProtobuf.this.version != -1 ? ParseProtobuf.this.client.getSchema(ParseProtobuf.this.schemaId, ParseProtobuf.this.version) : ParseProtobuf.this.client.getSchema(ParseProtobuf.this.schemaId), ParseProtobuf.this.recordName);
                    }
                });
                if (this.decoder == null) {
                    throw new DirectiveExecutionException("Unsupported protobuf decoder type");
                }
                this.decoderInitialized = true;
            } catch (RetryException e) {
                throw new DirectiveExecutionException(String.format("Issue in retrieving protobuf descriptor from schema registry. %s", e.getCause()));
            } catch (ExecutionException e2) {
                throw new DirectiveExecutionException(String.format("Unable to retrieve protobuf descriptor from schema registry. %s", e2.getCause()));
            }
        }
        try {
            for (Row row : list) {
                int find = row.find(this.column);
                if (find != -1) {
                    Object value = row.getValue(find);
                    if (!(value instanceof byte[])) {
                        throw new DirectiveExecutionException(toString() + " : column " + this.column + " should be of type byte array");
                    }
                    arrayList.addAll(this.decoder.decode((byte[]) value));
                }
            }
            return arrayList;
        } catch (DecoderException e3) {
            throw new DirectiveExecutionException(toString() + " Issue decoding Protobuf record. Check schema version '" + (this.version == -1 ? "latest" : Long.valueOf(this.version)) + "'. " + e3.getMessage());
        }
    }
}
