package co.cask.cdap.etl.realtime.sink;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.data.stream.Stream;
import co.cask.cdap.api.plugin.PluginConfig;
import co.cask.cdap.api.stream.StreamEventData;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.realtime.DataWriter;
import co.cask.cdap.etl.api.realtime.RealtimeSink;
import co.cask.cdap.etl.common.Properties;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("Stream")
@Description("Real-time sink that outputs to a specified CDAP stream.")
@Plugin(type = "realtimesink")
/* loaded from: input_file:co/cask/cdap/etl/realtime/sink/StreamSink.class */
public class StreamSink extends RealtimeSink<StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamSink.class);
    private static final String NAME_DESC = "The name of the stream to output to. Must be a valid stream name. The stream will be created if it does not exist.";
    private static final String BODY_FIELD_DESC = "Name of the field in the record that contains the data to be written to the specified stream. The data could be in binary format as a byte array or a ByteBuffer. It can also be a String. If unspecified, the 'body' key is used.";
    private static final String HEADERS_FIELD_DESC = "Name of the field in the record that contains headers. Headers are presumed to be a map of string to string.";
    private final StreamConfig streamConfig;

    /* renamed from: co.cask.cdap.etl.realtime.sink.StreamSink$1, reason: invalid class name */
    /* loaded from: input_file:co/cask/cdap/etl/realtime/sink/StreamSink$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$co$cask$cdap$api$data$schema$Schema$Type = new int[Schema.Type.values().length];

        static {
            try {
                $SwitchMap$co$cask$cdap$api$data$schema$Schema$Type[Schema.Type.BYTES.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$co$cask$cdap$api$data$schema$Schema$Type[Schema.Type.STRING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/etl/realtime/sink/StreamSink$StreamConfig.class */
    public static class StreamConfig extends PluginConfig {

        @Description(StreamSink.NAME_DESC)
        private String name;

        @Name(Properties.Stream.HEADERS_FIELD)
        @Description(StreamSink.HEADERS_FIELD_DESC)
        @Nullable
        private String headersField;

        @Name(Properties.Stream.BODY_FIELD)
        @Description(StreamSink.BODY_FIELD_DESC)
        @Nullable
        private String bodyField;

        public StreamConfig() {
            this(null, Properties.Stream.DEFAULT_HEADERS_FIELD, Properties.Stream.DEFAULT_BODY_FIELD);
        }

        public StreamConfig(String str, String str2, String str3) {
            this.name = str;
            this.headersField = str2;
            this.bodyField = str3;
        }
    }

    public StreamSink(StreamConfig streamConfig) {
        this.streamConfig = streamConfig;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(this.streamConfig.name), "Stream name should be non-null, non-empty.");
        pipelineConfigurer.addStream(new Stream(this.streamConfig.name));
    }

    public int write(Iterable<StructuredRecord> iterable, DataWriter dataWriter) throws Exception {
        int i = 0;
        for (StructuredRecord structuredRecord : iterable) {
            Schema schema = structuredRecord.getSchema();
            Object obj = structuredRecord.get(this.streamConfig.bodyField);
            Object obj2 = structuredRecord.get(this.streamConfig.headersField);
            if (obj == null) {
                LOG.debug("Found null data. Skipping record.");
            } else if (obj2 == null || isHeadersSchemaPresentAndSupported(schema)) {
                switch (AnonymousClass1.$SwitchMap$co$cask$cdap$api$data$schema$Schema$Type[schema.getField(this.streamConfig.bodyField).getSchema().getType().ordinal()]) {
                    case 1:
                        i += writeBytes(dataWriter, obj, obj2);
                        break;
                    case 2:
                        i += writeString(dataWriter, obj, obj2);
                        break;
                    default:
                        LOG.debug("Type {} is not supported for writing to stream", obj.getClass().getName());
                        break;
                }
            } else {
                LOG.debug("Headers found in input, however either the headers schema is not provided or the provided schema is not supported. Only a map of string keys and string values is supported. Skipping record.");
            }
        }
        return i;
    }

    private boolean isHeadersSchemaPresentAndSupported(Schema schema) {
        Schema.Field field = schema.getField(this.streamConfig.headersField);
        if (field == null) {
            return false;
        }
        Map.Entry mapSchema = field.getSchema().getMapSchema();
        return ((Schema) mapSchema.getKey()).getType().equals(Schema.Type.STRING) && ((Schema) mapSchema.getValue()).getType().equals(Schema.Type.STRING);
    }

    private int writeBytes(DataWriter dataWriter, Object obj, Object obj2) throws IOException {
        ByteBuffer wrap;
        if (obj instanceof ByteBuffer) {
            wrap = (ByteBuffer) obj;
        } else {
            if (!(obj instanceof byte[])) {
                LOG.debug("Type {} is not supported for writing to stream", obj.getClass().getName());
                return 0;
            }
            wrap = ByteBuffer.wrap((byte[]) obj);
        }
        if (obj2 == null || !(obj2 instanceof Map)) {
            dataWriter.write(this.streamConfig.name, wrap);
            return 1;
        }
        dataWriter.write(this.streamConfig.name, new StreamEventData((Map) obj2, wrap));
        return 1;
    }

    private int writeString(DataWriter dataWriter, Object obj, Object obj2) throws IOException {
        if (obj2 == null || !(obj2 instanceof Map)) {
            dataWriter.write(this.streamConfig.name, (String) obj);
            return 1;
        }
        dataWriter.write(this.streamConfig.name, (String) obj, (Map) obj2);
        return 1;
    }
}
