/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import com.alibaba.fastjson.JSON;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.protobuf.PayloadMessages;
import org.apache.beam.sdk.extensions.protobuf.ProtoMessageSchema;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.KafkaTableProvider;
import org.apache.beam.sdk.io.thrift.ThriftCoder;
import org.apache.beam.sdk.io.thrift.ThriftSchema;
import org.apache.beam.sdk.io.thrift.payloads.ItThriftMessage;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.schemas.RowMessages;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.commons.csv.CSVFormat;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@RunWith(value=Parameterized.class)
public class KafkaTableProviderIT {
    private static final String KAFKA_CONTAINER_VERSION = "5.5.2";
    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();
    @ClassRule
    public static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse((String)"confluentinc/cp-kafka").withTag("5.5.2"));
    private static KafkaOptions kafkaOptions;
    private static final Schema TEST_TABLE_SCHEMA;
    @Parameterized.Parameter
    public KafkaObjectProvider objectsProvider;
    @Parameterized.Parameter(value=1)
    public String topic;
    static final transient Map<Long, Boolean> FLAG;

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        return Arrays.asList({new KafkaJsonObjectProvider(), "json_topic"}, {new KafkaAvroObjectProvider(), "avro_topic"}, {new KafkaProtoObjectProvider(), "proto_topic"}, {new KafkaCsvObjectProvider(), "csv_topic"}, {new KafkaThriftObjectProvider(), "thrift_topic"});
    }

    @Before
    public void setUp() {
        kafkaOptions = (KafkaOptions)this.pipeline.getOptions().as(KafkaOptions.class);
        kafkaOptions.setKafkaTopic(this.topic);
        kafkaOptions.setKafkaBootstrapServerAddress(KAFKA_CONTAINER.getBootstrapServers());
        Preconditions.checkArgument((!KAFKA_CONTAINER.getBootstrapServers().contains(",") ? 1 : 0) != 0, (Object)"This integration test expects exactly one bootstrap server.");
    }

    private static String buildLocation() {
        return kafkaOptions.getKafkaBootstrapServerAddress() + "/" + kafkaOptions.getKafkaTopic();
    }

    @Test
    public void testFake2() throws BeamKafkaTable.NoEstimationException {
        Table table = Table.builder().name("kafka_table").comment("kafka table").location(KafkaTableProviderIT.buildLocation()).schema(TEST_TABLE_SCHEMA).type("kafka").properties(JSON.parseObject((String)this.objectsProvider.getKafkaPropertiesString())).build();
        BeamKafkaTable kafkaTable = (BeamKafkaTable)new KafkaTableProvider().buildBeamSqlTable(table);
        this.produceSomeRecordsWithDelay(100, 20);
        double rate1 = kafkaTable.computeRate(20);
        this.produceSomeRecordsWithDelay(100, 10);
        double rate2 = kafkaTable.computeRate(20);
        Assert.assertTrue((rate2 > rate1 ? 1 : 0) != 0);
    }

    @Test
    public void testFake() throws InterruptedException {
        ((DirectOptions)this.pipeline.getOptions().as(DirectOptions.class)).setBlockOnRun(false);
        String createTableString = String.format("CREATE EXTERNAL TABLE kafka_table(\nf_long BIGINT NOT NULL, \nf_int INTEGER NOT NULL, \nf_string VARCHAR NOT NULL \n) \nTYPE 'kafka' \nLOCATION '%s'\nTBLPROPERTIES '%s'", KafkaTableProviderIT.buildLocation(), this.objectsProvider.getKafkaPropertiesString());
        KafkaTableProvider tb = new KafkaTableProvider();
        BeamSqlEnv env = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{tb});
        env.executeDdl(createTableString);
        PCollection queryOutput = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)env.parseQuery("SELECT * FROM kafka_table"));
        ((PCollection)queryOutput.apply((PTransform)ParDo.of((DoFn)new FakeKvPair()))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)RowCoder.of((Schema)TEST_TABLE_SCHEMA))).apply("waitForSuccess", (PTransform)ParDo.of((DoFn)new StreamAssertEqual((Set<Row>)ImmutableSet.of((Object)KafkaTableProviderIT.generateRow(0), (Object)KafkaTableProviderIT.generateRow(1), (Object)KafkaTableProviderIT.generateRow(2)))));
        queryOutput.apply(KafkaTableProviderIT.logRecords(""));
        this.pipeline.run();
        TimeUnit.SECONDS.sleep(4L);
        this.produceSomeRecords(3);
        for (int i = 0; i < 200; ++i) {
            if (FLAG.getOrDefault(this.pipeline.getOptions().getOptionsId(), false).booleanValue()) {
                return;
            }
            TimeUnit.MILLISECONDS.sleep(90L);
        }
        Assert.fail();
    }

    @Test
    public void testFakeNested() throws InterruptedException {
        Assume.assumeFalse((boolean)this.topic.equals("csv_topic"));
        ((DirectOptions)this.pipeline.getOptions().as(DirectOptions.class)).setBlockOnRun(false);
        String createTableString = String.format("CREATE EXTERNAL TABLE kafka_table(\nheaders ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,payload ROW<f_long BIGINT NOT NULL, \nf_int INTEGER NOT NULL, \nf_string VARCHAR NOT NULL \n>) \nTYPE 'kafka' \nLOCATION '%s'\nTBLPROPERTIES '%s'", KafkaTableProviderIT.buildLocation(), this.objectsProvider.getKafkaPropertiesString());
        KafkaTableProvider tb = new KafkaTableProvider();
        BeamSqlEnv env = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{tb});
        env.executeDdl(createTableString);
        PCollection queryOutput = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)env.parseQuery("SELECT kafka_table.payload.f_long, kafka_table.payload.f_int, kafka_table.payload.f_string FROM kafka_table"));
        ((PCollection)queryOutput.apply((PTransform)ParDo.of((DoFn)new FakeKvPair()))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)RowCoder.of((Schema)TEST_TABLE_SCHEMA))).apply("waitForSuccess", (PTransform)ParDo.of((DoFn)new StreamAssertEqual((Set<Row>)ImmutableSet.of((Object)KafkaTableProviderIT.generateRow(0), (Object)KafkaTableProviderIT.generateRow(1), (Object)KafkaTableProviderIT.generateRow(2)))));
        queryOutput.apply(KafkaTableProviderIT.logRecords(""));
        this.pipeline.run();
        TimeUnit.SECONDS.sleep(4L);
        this.produceSomeRecords(3);
        for (int i = 0; i < 200; ++i) {
            if (FLAG.getOrDefault(this.pipeline.getOptions().getOptionsId(), false).booleanValue()) {
                return;
            }
            TimeUnit.MILLISECONDS.sleep(90L);
        }
        Assert.fail();
    }

    private static MapElements<Row, Void> logRecords(final String suffix) {
        return MapElements.via((SimpleFunction)new SimpleFunction<Row, Void>(){

            public @Nullable Void apply(Row input) {
                System.out.println(input.getValues() + suffix);
                return null;
            }
        });
    }

    private static Row generateRow(int i) {
        return Row.withSchema((Schema)TEST_TABLE_SCHEMA).addValues(new Object[]{(long)i, i % 3 + 1, "value" + i}).build();
    }

    private void produceSomeRecords(int num) {
        KafkaProducer producer = new KafkaProducer(this.producerProps());
        Stream.iterate(0, i -> {
            i = i + 1;
            return i;
        }).limit(num).forEach(arg_0 -> this.lambda$produceSomeRecords$1((Producer)producer, arg_0));
        producer.flush();
        producer.close();
    }

    private void produceSomeRecordsWithDelay(int num, int delayMilis) {
        KafkaProducer producer = new KafkaProducer(this.producerProps());
        Stream.iterate(0, i -> {
            i = i + 1;
            return i;
        }).limit(num).forEach(arg_0 -> this.lambda$produceSomeRecordsWithDelay$3((Producer)producer, delayMilis, arg_0));
        producer.flush();
        producer.close();
    }

    private Properties producerProps() {
        Properties props = new Properties();
        props.put("bootstrap.servers", kafkaOptions.getKafkaBootstrapServerAddress());
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("buffer.memory", (Object)0x2000000);
        props.put("acks", "all");
        props.put("request.required.acks", "1");
        props.put("retries", (Object)0);
        props.put("linger.ms", (Object)1);
        return props;
    }

    private /* synthetic */ void lambda$produceSomeRecordsWithDelay$3(Producer producer, int delayMilis, Integer i) {
        ProducerRecord<String, byte[]> record = this.objectsProvider.generateProducerRecord(i);
        producer.send(record);
        try {
            TimeUnit.MILLISECONDS.sleep(delayMilis);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Could not wait for producing", e);
        }
    }

    private /* synthetic */ void lambda$produceSomeRecords$1(Producer producer, Integer i) {
        ProducerRecord<String, byte[]> record = this.objectsProvider.generateProducerRecord(i);
        producer.send(record);
    }

    static /* synthetic */ Schema access$500() {
        return TEST_TABLE_SCHEMA;
    }

    static {
        TEST_TABLE_SCHEMA = Schema.builder().addInt64Field("f_long").addInt32Field("f_int").addStringField("f_string").build();
        FLAG = new ConcurrentHashMap<Long, Boolean>();
    }

    public static interface KafkaOptions
    extends PipelineOptions {
        @Description(value="Kafka server address")
        @Validation.Required
        @Default.String(value="localhost:9092")
        public String getKafkaBootstrapServerAddress();

        public void setKafkaBootstrapServerAddress(String var1);

        @Description(value="Kafka topic")
        @Validation.Required
        @Default.String(value="test")
        public String getKafkaTopic();

        public void setKafkaTopic(String var1);
    }

    private static class KafkaThriftObjectProvider
    extends KafkaObjectProvider {
        private final Class<ItThriftMessage> thriftClass = ItThriftMessage.class;
        private final TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
        private final SimpleFunction<Row, byte[]> toBytesFn = RowMessages.rowToBytesFn((SchemaProvider)ThriftSchema.provider(), (TypeDescriptor)TypeDescriptor.of(this.thriftClass), (Coder)ThriftCoder.of(this.thriftClass, (TProtocolFactory)this.protocolFactory));

        private KafkaThriftObjectProvider() {
        }

        @Override
        protected ProducerRecord<String, byte[]> generateProducerRecord(int i) {
            return new ProducerRecord(kafkaOptions.getKafkaTopic(), (Object)("k" + i), (Object)((byte[])this.toBytesFn.apply((Object)KafkaTableProviderIT.generateRow(i))));
        }

        @Override
        protected String getKafkaPropertiesString() {
            return "{ \"format\" : \"thrift\",\"thriftClass\": \"" + this.thriftClass.getName() + "\", \"thriftProtocolFactoryClass\": \"" + this.protocolFactory.getClass().getName() + "\"}";
        }

        @Override
        protected String getPayloadFormat() {
            return "thrift";
        }
    }

    private static class KafkaAvroObjectProvider
    extends KafkaObjectProvider {
        private final SimpleFunction<Row, byte[]> toBytesFn = AvroUtils.getRowToAvroBytesFunction((Schema)KafkaTableProviderIT.access$500());

        private KafkaAvroObjectProvider() {
        }

        @Override
        protected ProducerRecord<String, byte[]> generateProducerRecord(int i) {
            return new ProducerRecord(kafkaOptions.getKafkaTopic(), (Object)("k" + i), (Object)((byte[])this.toBytesFn.apply((Object)KafkaTableProviderIT.generateRow(i))));
        }

        @Override
        protected String getPayloadFormat() {
            return "avro";
        }
    }

    private static class KafkaCsvObjectProvider
    extends KafkaObjectProvider {
        private KafkaCsvObjectProvider() {
        }

        @Override
        protected ProducerRecord<String, byte[]> generateProducerRecord(int i) {
            return new ProducerRecord(kafkaOptions.getKafkaTopic(), (Object)("k" + i), (Object)BeamTableUtils.beamRow2CsvLine((Row)KafkaTableProviderIT.generateRow(i), (CSVFormat)CSVFormat.DEFAULT).getBytes(StandardCharsets.UTF_8));
        }

        @Override
        protected String getPayloadFormat() {
            return null;
        }
    }

    private static class KafkaProtoObjectProvider
    extends KafkaObjectProvider {
        private final SimpleFunction<Row, byte[]> toBytesFn = ProtoMessageSchema.getRowToProtoBytesFn(PayloadMessages.ItMessage.class);

        private KafkaProtoObjectProvider() {
        }

        @Override
        protected ProducerRecord<String, byte[]> generateProducerRecord(int i) {
            return new ProducerRecord(kafkaOptions.getKafkaTopic(), (Object)("k" + i), (Object)((byte[])this.toBytesFn.apply((Object)KafkaTableProviderIT.generateRow(i))));
        }

        @Override
        protected String getPayloadFormat() {
            return "proto";
        }

        @Override
        protected String getKafkaPropertiesString() {
            return "{ \"format\" : \"proto\",\"protoClass\": \"" + PayloadMessages.ItMessage.class.getName() + "\"}";
        }
    }

    private static class KafkaJsonObjectProvider
    extends KafkaObjectProvider {
        private KafkaJsonObjectProvider() {
        }

        @Override
        protected ProducerRecord<String, byte[]> generateProducerRecord(int i) {
            return new ProducerRecord(kafkaOptions.getKafkaTopic(), (Object)("k" + i), (Object)this.createJson(i).getBytes(StandardCharsets.UTF_8));
        }

        @Override
        protected String getPayloadFormat() {
            return "json";
        }

        private String createJson(int i) {
            return String.format("{\"f_long\": %s, \"f_int\": %s, \"f_string\": \"%s\"}", i, i % 3 + 1, "value" + i);
        }
    }

    private static abstract class KafkaObjectProvider
    implements Serializable {
        private KafkaObjectProvider() {
        }

        protected abstract ProducerRecord<String, byte[]> generateProducerRecord(int var1);

        protected abstract String getPayloadFormat();

        protected String getKafkaPropertiesString() {
            return "{ " + (this.getPayloadFormat() == null ? "" : "\"format\" : \"" + this.getPayloadFormat() + "\",") + "}";
        }
    }

    public static class StreamAssertEqual
    extends DoFn<KV<String, Row>, Void> {
        private final Set<Row> expected;
        @DoFn.StateId(value="seenValues")
        private final StateSpec<BagState<Row>> seenRows = StateSpecs.bag((Coder)RowCoder.of((Schema)KafkaTableProviderIT.access$500()));
        @DoFn.StateId(value="count")
        private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

        StreamAssertEqual(Set<Row> expected) {
            this.expected = expected;
        }

        @DoFn.ProcessElement
        public void process(DoFn.ProcessContext context, @DoFn.StateId(value="seenValues") BagState<Row> seenValues, @DoFn.StateId(value="count") ValueState<Integer> countState) {
            int count = (Integer)MoreObjects.firstNonNull((Object)((Integer)countState.read()), (Object)0);
            countState.write((Object)(++count));
            seenValues.add((Object)((Row)((KV)context.element()).getValue()));
            if (count >= this.expected.size() && StreamSupport.stream(seenValues.read().spliterator(), false).collect(Collectors.toSet()).containsAll(this.expected)) {
                System.out.println("in second if");
                FLAG.put(context.getPipelineOptions().getOptionsId(), true);
            }
        }
    }

    public static class FakeKvPair
    extends DoFn<Row, KV<String, Row>> {
        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            c.output((Object)KV.of((Object)"fake_key", (Object)((Row)c.element())));
        }
    }
}

