/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import org.apache.beam.model.expansion.v1.ExpansionApi;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.expansion.service.ExpansionService;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaTimestampType;
import org.apache.beam.sdk.io.kafka.KafkaWriter;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.text.MatchesPattern;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.powermock.reflect.Whitebox;

@RunWith(value=JUnit4.class)
public class KafkaIOExternalTest {
    private void verifyKafkaReadComposite(RunnerApi.PTransform kafkaSDFReadComposite, ExpansionApi.ExpansionResponse result) throws Exception {
        MatcherAssert.assertThat((Object)kafkaSDFReadComposite.getSubtransformsList(), (Matcher)Matchers.hasItem((Matcher)MatchesPattern.matchesPattern((String)".*Impulse.*")));
        MatcherAssert.assertThat((Object)kafkaSDFReadComposite.getSubtransformsList(), (Matcher)Matchers.hasItem((Matcher)MatchesPattern.matchesPattern((String)".*GenerateKafkaSourceDescriptor.*")));
        MatcherAssert.assertThat((Object)kafkaSDFReadComposite.getSubtransformsList(), (Matcher)Matchers.hasItem((Matcher)MatchesPattern.matchesPattern((String)".*ReadSourceDescriptors.*")));
        RunnerApi.PTransform kafkaSdfParDo = result.getComponents().getTransformsOrThrow(kafkaSDFReadComposite.getSubtransforms(2));
        RunnerApi.ParDoPayload parDoPayload = RunnerApi.ParDoPayload.parseFrom((ByteString)kafkaSdfParDo.getSpec().getPayload());
        Assert.assertNotNull((Object)parDoPayload.getRestrictionCoderId());
    }

    @Test
    public void testConstructKafkaRead() throws Exception {
        ImmutableList topics = ImmutableList.of((Object)"topic1", (Object)"topic2");
        String keyDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
        String valueDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
        ImmutableMap consumerConfig = ImmutableMap.builder().put((Object)"bootstrap.servers", (Object)"server1:port,server2:port").put((Object)"key2", (Object)"value2").put((Object)"key.deserializer", (Object)keyDeserializer).put((Object)"value.deserializer", (Object)valueDeserializer).build();
        Long startReadTime = 100L;
        ExternalTransforms.ExternalConfigurationPayload payload = KafkaIOExternalTest.encodeRow(Row.withSchema((Schema)Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"topics", (Schema.FieldType)Schema.FieldType.array((Schema.FieldType)Schema.FieldType.STRING)), Schema.Field.of((String)"consumer_config", (Schema.FieldType)Schema.FieldType.map((Schema.FieldType)Schema.FieldType.STRING, (Schema.FieldType)Schema.FieldType.STRING)), Schema.Field.of((String)"key_deserializer", (Schema.FieldType)Schema.FieldType.STRING), Schema.Field.of((String)"value_deserializer", (Schema.FieldType)Schema.FieldType.STRING), Schema.Field.of((String)"start_read_time", (Schema.FieldType)Schema.FieldType.INT64), Schema.Field.of((String)"commit_offset_in_finalize", (Schema.FieldType)Schema.FieldType.BOOLEAN), Schema.Field.of((String)"timestamp_policy", (Schema.FieldType)Schema.FieldType.STRING)})).withFieldValue("topics", (Object)topics).withFieldValue("consumer_config", (Object)consumerConfig).withFieldValue("key_deserializer", (Object)keyDeserializer).withFieldValue("value_deserializer", (Object)valueDeserializer).withFieldValue("start_read_time", (Object)startReadTime).withFieldValue("commit_offset_in_finalize", (Object)false).withFieldValue("timestamp_policy", (Object)"ProcessingTime").build());
        RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
        ExpansionApi.ExpansionRequest request = ExpansionApi.ExpansionRequest.newBuilder().setComponents(defaultInstance).setTransform(RunnerApi.PTransform.newBuilder().setUniqueName("test").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:org.apache.beam:kafka_read_with_metadata:v1").setPayload(payload.toByteString()))).setNamespace("test_namespace").build();
        ExpansionService expansionService = new ExpansionService();
        TestStreamObserver observer = new TestStreamObserver();
        expansionService.expand(request, observer);
        ExpansionApi.ExpansionResponse result = (ExpansionApi.ExpansionResponse)observer.result;
        RunnerApi.PTransform transform = result.getTransform();
        MatcherAssert.assertThat((Object)transform.getSubtransformsList(), (Matcher)Matchers.hasItem((Matcher)MatchesPattern.matchesPattern((String)".*KafkaIO-Read.*")));
        MatcherAssert.assertThat((Object)transform.getSubtransformsList(), (Matcher)Matchers.hasItem((Matcher)MatchesPattern.matchesPattern((String)".*Convert-to-ExternalKafkaRecord.*")));
        MatcherAssert.assertThat((Object)transform.getSubtransformsList(), (Matcher)Matchers.hasItem((Matcher)MatchesPattern.matchesPattern((String)".*Convert-ConvertTransform.*")));
        MatcherAssert.assertThat((Object)transform.getInputsCount(), (Matcher)Matchers.is((Object)0));
        MatcherAssert.assertThat((Object)transform.getOutputsCount(), (Matcher)Matchers.is((Object)1));
        RunnerApi.PTransform kafkaReadComposite = result.getComponents().getTransformsOrThrow(transform.getSubtransforms(0));
        this.verifyKafkaReadComposite(result.getComponents().getTransformsOrThrow(kafkaReadComposite.getSubtransforms(0)), result);
    }

    @Test
    public void testKafkaRecordToExternalKafkaRecord() throws Exception {
        RecordHeaders headers = new RecordHeaders();
        headers.add("dummyHeaderKey", "dummyHeaderVal".getBytes(StandardCharsets.UTF_8));
        KafkaRecord kafkaRecord = new KafkaRecord("dummyTopic", 111, 222L, 12345L, KafkaTimestampType.LOG_APPEND_TIME, (Headers)headers, (Object)"dummyKey".getBytes(StandardCharsets.UTF_8), (Object)"dummyValue".getBytes(StandardCharsets.UTF_8));
        KafkaIO.ByteArrayKafkaRecord byteArrayKafkaRecord = KafkaIO.RowsWithMetadata.toExternalKafkaRecord((KafkaRecord)kafkaRecord);
        Assert.assertEquals((Object)"dummyTopic", (Object)byteArrayKafkaRecord.topic);
        Assert.assertEquals((long)111L, (long)byteArrayKafkaRecord.partition);
        Assert.assertEquals((long)222L, (long)byteArrayKafkaRecord.offset);
        Assert.assertEquals((long)12345L, (long)byteArrayKafkaRecord.timestamp);
        Assert.assertEquals((long)KafkaTimestampType.LOG_APPEND_TIME.id, (long)byteArrayKafkaRecord.timestampTypeId);
        Assert.assertEquals((Object)KafkaTimestampType.LOG_APPEND_TIME.name, (Object)byteArrayKafkaRecord.timestampTypeName);
        Assert.assertEquals((Object)"dummyKey", (Object)new String(byteArrayKafkaRecord.key, "UTF-8"));
        Assert.assertEquals((Object)"dummyValue", (Object)new String(byteArrayKafkaRecord.value, "UTF-8"));
        Assert.assertEquals((long)1L, (long)byteArrayKafkaRecord.headers.size());
        Assert.assertEquals((Object)"dummyHeaderKey", (Object)((KafkaIO.KafkaHeader)byteArrayKafkaRecord.headers.get((int)0)).key);
        Assert.assertEquals((Object)"dummyHeaderVal", (Object)new String(((KafkaIO.KafkaHeader)byteArrayKafkaRecord.headers.get((int)0)).value, "UTF-8"));
    }

    @Test
    public void testConstructKafkaReadWithoutMetadata() throws Exception {
        ImmutableList topics = ImmutableList.of((Object)"topic1", (Object)"topic2");
        String keyDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
        String valueDeserializer = "org.apache.kafka.common.serialization.LongDeserializer";
        ImmutableMap consumerConfig = ImmutableMap.builder().put((Object)"bootstrap.servers", (Object)"server1:port,server2:port").put((Object)"key2", (Object)"value2").put((Object)"key.deserializer", (Object)keyDeserializer).put((Object)"value.deserializer", (Object)valueDeserializer).build();
        Long startReadTime = 100L;
        ExternalTransforms.ExternalConfigurationPayload payload = KafkaIOExternalTest.encodeRow(Row.withSchema((Schema)Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"topics", (Schema.FieldType)Schema.FieldType.array((Schema.FieldType)Schema.FieldType.STRING)), Schema.Field.of((String)"consumer_config", (Schema.FieldType)Schema.FieldType.map((Schema.FieldType)Schema.FieldType.STRING, (Schema.FieldType)Schema.FieldType.STRING)), Schema.Field.of((String)"key_deserializer", (Schema.FieldType)Schema.FieldType.STRING), Schema.Field.of((String)"value_deserializer", (Schema.FieldType)Schema.FieldType.STRING), Schema.Field.of((String)"start_read_time", (Schema.FieldType)Schema.FieldType.INT64), Schema.Field.of((String)"commit_offset_in_finalize", (Schema.FieldType)Schema.FieldType.BOOLEAN), Schema.Field.of((String)"timestamp_policy", (Schema.FieldType)Schema.FieldType.STRING)})).withFieldValue("topics", (Object)topics).withFieldValue("consumer_config", (Object)consumerConfig).withFieldValue("key_deserializer", (Object)keyDeserializer).withFieldValue("value_deserializer", (Object)valueDeserializer).withFieldValue("start_read_time", (Object)startReadTime).withFieldValue("commit_offset_in_finalize", (Object)false).withFieldValue("timestamp_policy", (Object)"ProcessingTime").build());
        RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
        ExpansionApi.ExpansionRequest request = ExpansionApi.ExpansionRequest.newBuilder().setComponents(defaultInstance).setTransform(RunnerApi.PTransform.newBuilder().setUniqueName("test").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:org.apache.beam:kafka_read_without_metadata:v1").setPayload(payload.toByteString()))).setNamespace("test_namespace").build();
        ExpansionService expansionService = new ExpansionService();
        TestStreamObserver observer = new TestStreamObserver();
        expansionService.expand(request, observer);
        ExpansionApi.ExpansionResponse result = (ExpansionApi.ExpansionResponse)observer.result;
        RunnerApi.PTransform transform = result.getTransform();
        MatcherAssert.assertThat((Object)transform.getSubtransformsList(), (Matcher)Matchers.hasItem((Matcher)MatchesPattern.matchesPattern((String)".*KafkaIO-Read.*")));
        MatcherAssert.assertThat((Object)transform.getSubtransformsList(), (Matcher)Matchers.hasItem((Matcher)MatchesPattern.matchesPattern((String)".*Remove-Kafka-Metadata.*")));
        MatcherAssert.assertThat((Object)transform.getInputsCount(), (Matcher)Matchers.is((Object)0));
        MatcherAssert.assertThat((Object)transform.getOutputsCount(), (Matcher)Matchers.is((Object)1));
        RunnerApi.PTransform kafkaReadComposite = result.getComponents().getTransformsOrThrow(transform.getSubtransforms(0));
        result.getComponents().getTransformsOrThrow(kafkaReadComposite.getSubtransforms(0));
        this.verifyKafkaReadComposite(result.getComponents().getTransformsOrThrow(kafkaReadComposite.getSubtransforms(0)), result);
    }

    @Test
    public void testConstructKafkaWrite() throws Exception {
        String topic = "topic";
        String keySerializer = "org.apache.kafka.common.serialization.ByteArraySerializer";
        String valueSerializer = "org.apache.kafka.common.serialization.LongSerializer";
        ImmutableMap producerConfig = ImmutableMap.builder().put((Object)"bootstrap.servers", (Object)"server1:port,server2:port").put((Object)"retries", (Object)"3").build();
        ExternalTransforms.ExternalConfigurationPayload payload = KafkaIOExternalTest.encodeRow(Row.withSchema((Schema)Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"topic", (Schema.FieldType)Schema.FieldType.STRING), Schema.Field.of((String)"producer_config", (Schema.FieldType)Schema.FieldType.map((Schema.FieldType)Schema.FieldType.STRING, (Schema.FieldType)Schema.FieldType.STRING)), Schema.Field.of((String)"key_serializer", (Schema.FieldType)Schema.FieldType.STRING), Schema.Field.of((String)"value_serializer", (Schema.FieldType)Schema.FieldType.STRING)})).withFieldValue("topic", (Object)topic).withFieldValue("producer_config", (Object)producerConfig).withFieldValue("key_serializer", (Object)keySerializer).withFieldValue("value_serializer", (Object)valueSerializer).build());
        Pipeline p = Pipeline.create();
        ((PCollection)p.apply((PTransform)Impulse.create())).apply((PTransform)WithKeys.of((Object)"key"));
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        String inputPCollection = (String)Iterables.getOnlyElement(((RunnerApi.PTransform)Iterables.getLast(pipelineProto.getComponents().getTransformsMap().values())).getOutputsMap().values());
        ExpansionApi.ExpansionRequest request = ExpansionApi.ExpansionRequest.newBuilder().setComponents(pipelineProto.getComponents()).setTransform(RunnerApi.PTransform.newBuilder().setUniqueName("test").putInputs("input", inputPCollection).setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:org.apache.beam:kafka_write:v1").setPayload(payload.toByteString()))).setNamespace("test_namespace").build();
        ExpansionService expansionService = new ExpansionService();
        TestStreamObserver observer = new TestStreamObserver();
        expansionService.expand(request, observer);
        ExpansionApi.ExpansionResponse result = (ExpansionApi.ExpansionResponse)observer.result;
        RunnerApi.PTransform transform = result.getTransform();
        MatcherAssert.assertThat((Object)transform.getSubtransformsList(), (Matcher)Matchers.hasItem((Matcher)MatchesPattern.matchesPattern((String)".*Kafka-ProducerRecord.*")));
        MatcherAssert.assertThat((Object)transform.getSubtransformsList(), (Matcher)Matchers.hasItem((Matcher)MatchesPattern.matchesPattern((String)".*KafkaIO-WriteRecords.*")));
        MatcherAssert.assertThat((Object)transform.getInputsCount(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat((Object)transform.getOutputsCount(), (Matcher)Matchers.is((Object)0));
        RunnerApi.PTransform writeComposite = result.getComponents().getTransformsOrThrow(transform.getSubtransforms(1));
        RunnerApi.PTransform writeParDo = result.getComponents().getTransformsOrThrow(result.getComponents().getTransformsOrThrow(writeComposite.getSubtransforms(0)).getSubtransforms(0));
        RunnerApi.ParDoPayload parDoPayload = RunnerApi.ParDoPayload.parseFrom((ByteString)writeParDo.getSpec().getPayload());
        DoFn kafkaWriter = ParDoTranslation.getDoFn((RunnerApi.ParDoPayload)parDoPayload);
        MatcherAssert.assertThat((Object)kafkaWriter, (Matcher)Matchers.instanceOf(KafkaWriter.class));
        KafkaIO.WriteRecords spec = (KafkaIO.WriteRecords)Whitebox.getInternalState((Object)kafkaWriter, (String)"spec");
        MatcherAssert.assertThat((Object)spec.getProducerConfig(), (Matcher)Matchers.is((Object)producerConfig));
        MatcherAssert.assertThat((Object)spec.getTopic(), (Matcher)Matchers.is((Object)topic));
        MatcherAssert.assertThat((Object)spec.getKeySerializer().getName(), (Matcher)Matchers.is((Object)keySerializer));
        MatcherAssert.assertThat((Object)spec.getValueSerializer().getName(), (Matcher)Matchers.is((Object)valueSerializer));
    }

    private static ExternalTransforms.ExternalConfigurationPayload encodeRow(Row row) {
        ByteString.Output outputStream = ByteString.newOutput();
        try {
            SchemaCoder.of((Schema)row.getSchema()).encode((Object)row, (OutputStream)outputStream);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return ExternalTransforms.ExternalConfigurationPayload.newBuilder().setSchema(SchemaTranslation.schemaToProto((Schema)row.getSchema(), (boolean)true)).setPayload(outputStream.toByteString()).build();
    }

    private static class TestStreamObserver<T>
    implements StreamObserver<T> {
        private T result;

        private TestStreamObserver() {
        }

        public void onNext(T t) {
            this.result = t;
        }

        public void onError(Throwable throwable) {
            throw new RuntimeException("Should not happen", throwable);
        }

        public void onCompleted() {
        }
    }
}

