/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.model.expansion.v1.ExpansionApi;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.core.construction.expansion.ExpansionService;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaUnboundedSource;
import org.apache.beam.sdk.io.kafka.KafkaWriter;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.internal.util.reflection.Whitebox;

@RunWith(value=JUnit4.class)
public class KafkaIOExternalTest {
    @Test
    public void testConstructKafkaRead() throws Exception {
        ImmutableList topics = ImmutableList.of((Object)"topic1", (Object)"topic2");
        String keyDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
        String valueDeserializer = "org.apache.kafka.common.serialization.LongDeserializer";
        ImmutableMap consumerConfig = ImmutableMap.builder().put((Object)"bootstrap.servers", (Object)"server1:port,server2:port").put((Object)"key2", (Object)"value2").put((Object)"key.deserializer", (Object)keyDeserializer).put((Object)"value.deserializer", (Object)valueDeserializer).build();
        ExternalTransforms.ExternalConfigurationPayload payload = ExternalTransforms.ExternalConfigurationPayload.newBuilder().putConfiguration("topics", ExternalTransforms.ConfigValue.newBuilder().addCoderUrn("beam:coder:iterable:v1").addCoderUrn("beam:coder:bytes:v1").setPayload(ByteString.copyFrom((byte[])KafkaIOExternalTest.listAsBytes((List<String>)topics))).build()).putConfiguration("consumer_config", ExternalTransforms.ConfigValue.newBuilder().addCoderUrn("beam:coder:iterable:v1").addCoderUrn("beam:coder:kv:v1").addCoderUrn("beam:coder:bytes:v1").addCoderUrn("beam:coder:bytes:v1").setPayload(ByteString.copyFrom((byte[])KafkaIOExternalTest.mapAsBytes((Map<String, String>)consumerConfig))).build()).putConfiguration("key_deserializer", ExternalTransforms.ConfigValue.newBuilder().addCoderUrn("beam:coder:bytes:v1").setPayload(ByteString.copyFrom((byte[])KafkaIOExternalTest.encodeString(keyDeserializer))).build()).putConfiguration("value_deserializer", ExternalTransforms.ConfigValue.newBuilder().addCoderUrn("beam:coder:bytes:v1").setPayload(ByteString.copyFrom((byte[])KafkaIOExternalTest.encodeString(valueDeserializer))).build()).build();
        RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
        ExpansionApi.ExpansionRequest request = ExpansionApi.ExpansionRequest.newBuilder().setComponents(defaultInstance).setTransform(RunnerApi.PTransform.newBuilder().setUniqueName("test").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:external:java:kafka:read:v1").setPayload(payload.toByteString()))).setNamespace("test_namespace").build();
        ExpansionService expansionService = new ExpansionService();
        TestStreamObserver observer = new TestStreamObserver();
        expansionService.expand(request, observer);
        ExpansionApi.ExpansionResponse result = (ExpansionApi.ExpansionResponse)observer.result;
        RunnerApi.PTransform transform = result.getTransform();
        MatcherAssert.assertThat((Object)transform.getSubtransformsList(), (Matcher)Matchers.contains((Object[])new String[]{"test_namespacetest/KafkaIO.Read", "test_namespacetest/Remove Kafka Metadata"}));
        MatcherAssert.assertThat((Object)transform.getInputsCount(), (Matcher)Matchers.is((Object)0));
        MatcherAssert.assertThat((Object)transform.getOutputsCount(), (Matcher)Matchers.is((Object)1));
        RunnerApi.PTransform kafkaComposite = result.getComponents().getTransformsOrThrow(transform.getSubtransforms(0));
        RunnerApi.PTransform kafkaRead = result.getComponents().getTransformsOrThrow(kafkaComposite.getSubtransforms(0));
        RunnerApi.ReadPayload readPayload = RunnerApi.ReadPayload.parseFrom((ByteString)kafkaRead.getSpec().getPayload());
        KafkaUnboundedSource source = (KafkaUnboundedSource)ReadTranslation.unboundedSourceFromProto((RunnerApi.ReadPayload)readPayload);
        KafkaIO.Read spec = source.getSpec();
        MatcherAssert.assertThat((Object)spec.getConsumerConfig(), (Matcher)Matchers.is((Object)consumerConfig));
        MatcherAssert.assertThat((Object)spec.getTopics(), (Matcher)Matchers.is((Object)topics));
        MatcherAssert.assertThat((Object)spec.getKeyDeserializer().getName(), (Matcher)Matchers.is((Object)keyDeserializer));
        MatcherAssert.assertThat((Object)spec.getValueDeserializer().getName(), (Matcher)Matchers.is((Object)valueDeserializer));
    }

    @Test
    public void testConstructKafkaWrite() throws Exception {
        String topic = "topic";
        String keySerializer = "org.apache.kafka.common.serialization.ByteArraySerializer";
        String valueSerializer = "org.apache.kafka.common.serialization.LongSerializer";
        ImmutableMap producerConfig = ImmutableMap.builder().put((Object)"bootstrap.servers", (Object)"server1:port,server2:port").put((Object)"retries", (Object)"3").build();
        ExternalTransforms.ExternalConfigurationPayload payload = ExternalTransforms.ExternalConfigurationPayload.newBuilder().putConfiguration("topic", ExternalTransforms.ConfigValue.newBuilder().addCoderUrn("beam:coder:bytes:v1").setPayload(ByteString.copyFrom((byte[])KafkaIOExternalTest.encodeString(topic))).build()).putConfiguration("producer_config", ExternalTransforms.ConfigValue.newBuilder().addCoderUrn("beam:coder:iterable:v1").addCoderUrn("beam:coder:kv:v1").addCoderUrn("beam:coder:bytes:v1").addCoderUrn("beam:coder:bytes:v1").setPayload(ByteString.copyFrom((byte[])KafkaIOExternalTest.mapAsBytes((Map<String, String>)producerConfig))).build()).putConfiguration("key_serializer", ExternalTransforms.ConfigValue.newBuilder().addCoderUrn("beam:coder:bytes:v1").setPayload(ByteString.copyFrom((byte[])KafkaIOExternalTest.encodeString(keySerializer))).build()).putConfiguration("value_serializer", ExternalTransforms.ConfigValue.newBuilder().addCoderUrn("beam:coder:bytes:v1").setPayload(ByteString.copyFrom((byte[])KafkaIOExternalTest.encodeString(valueSerializer))).build()).build();
        Pipeline p = Pipeline.create();
        ((PCollection)p.apply((PTransform)Impulse.create())).apply((PTransform)WithKeys.of((Object)"key"));
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        String inputPCollection = (String)Iterables.getOnlyElement(((RunnerApi.PTransform)Iterables.getLast(pipelineProto.getComponents().getTransformsMap().values())).getOutputsMap().values());
        ExpansionApi.ExpansionRequest request = ExpansionApi.ExpansionRequest.newBuilder().setComponents(pipelineProto.getComponents()).setTransform(RunnerApi.PTransform.newBuilder().setUniqueName("test").putInputs("input", inputPCollection).setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:external:java:kafka:write:v1").setPayload(payload.toByteString()))).setNamespace("test_namespace").build();
        ExpansionService expansionService = new ExpansionService();
        TestStreamObserver observer = new TestStreamObserver();
        expansionService.expand(request, observer);
        ExpansionApi.ExpansionResponse result = (ExpansionApi.ExpansionResponse)observer.result;
        RunnerApi.PTransform transform = result.getTransform();
        MatcherAssert.assertThat((Object)transform.getSubtransformsList(), (Matcher)Matchers.contains((Object[])new String[]{"test_namespacetest/Kafka ProducerRecord", "test_namespacetest/KafkaIO.WriteRecords"}));
        MatcherAssert.assertThat((Object)transform.getInputsCount(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat((Object)transform.getOutputsCount(), (Matcher)Matchers.is((Object)0));
        RunnerApi.PTransform writeComposite = result.getComponents().getTransformsOrThrow(transform.getSubtransforms(1));
        RunnerApi.PTransform writeParDo = result.getComponents().getTransformsOrThrow(result.getComponents().getTransformsOrThrow(writeComposite.getSubtransforms(0)).getSubtransforms(0));
        RunnerApi.ParDoPayload parDoPayload = RunnerApi.ParDoPayload.parseFrom((ByteString)writeParDo.getSpec().getPayload());
        DoFn kafkaWriter = ParDoTranslation.getDoFn((RunnerApi.ParDoPayload)parDoPayload);
        MatcherAssert.assertThat((Object)kafkaWriter, (Matcher)Matchers.instanceOf(KafkaWriter.class));
        KafkaIO.WriteRecords spec = (KafkaIO.WriteRecords)Whitebox.getInternalState((Object)kafkaWriter, (String)"spec");
        MatcherAssert.assertThat((Object)spec.getProducerConfig(), (Matcher)Matchers.is((Object)producerConfig));
        MatcherAssert.assertThat((Object)spec.getTopic(), (Matcher)Matchers.is((Object)topic));
        MatcherAssert.assertThat((Object)spec.getKeySerializer().getName(), (Matcher)Matchers.is((Object)keySerializer));
        MatcherAssert.assertThat((Object)spec.getValueSerializer().getName(), (Matcher)Matchers.is((Object)valueSerializer));
    }

    private static byte[] listAsBytes(List<String> stringList) throws IOException {
        IterableCoder coder = IterableCoder.of((Coder)ByteArrayCoder.of());
        List bytesList = stringList.stream().map(KafkaIOExternalTest::utf8Bytes).collect(Collectors.toList());
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        coder.encode(bytesList, (OutputStream)baos);
        return baos.toByteArray();
    }

    private static byte[] mapAsBytes(Map<String, String> stringMap) throws IOException {
        IterableCoder coder = IterableCoder.of((Coder)KvCoder.of((Coder)ByteArrayCoder.of(), (Coder)ByteArrayCoder.of()));
        List bytesList = stringMap.entrySet().stream().map(kv -> KV.of((Object)KafkaIOExternalTest.utf8Bytes((String)kv.getKey()), (Object)KafkaIOExternalTest.utf8Bytes((String)kv.getValue()))).collect(Collectors.toList());
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        coder.encode(bytesList, (OutputStream)baos);
        return baos.toByteArray();
    }

    private static byte[] encodeString(String str) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ByteArrayCoder.of().encode(KafkaIOExternalTest.utf8Bytes(str), (OutputStream)baos);
        return baos.toByteArray();
    }

    private static byte[] utf8Bytes(String str) {
        Preconditions.checkNotNull((Object)str, (Object)"String must not be null.");
        return str.getBytes(Charsets.UTF_8);
    }

    private static class TestStreamObserver<T>
    implements StreamObserver<T> {
        private T result;

        private TestStreamObserver() {
        }

        public void onNext(T t) {
            this.result = t;
        }

        public void onError(Throwable throwable) {
            throw new RuntimeException("Should not happen", throwable);
        }

        public void onCompleted() {
        }
    }
}

