/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.util.Map;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.tests.SmokeTestUtil;

public class StreamsUpgradeTest {
    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
        }
        String propFileName = args[0];
        Properties streamsProperties = Utils.loadProps((String)propFileName);
        System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.5)");
        System.out.println("props=" + streamsProperties);
        StreamsBuilder builder = new StreamsBuilder();
        KTable dataTable = builder.table("data", Consumed.with(SmokeTestUtil.stringSerde, SmokeTestUtil.intSerde));
        KStream dataStream = dataTable.toStream();
        dataStream.process(StreamsUpgradeTest.printProcessorSupplier("data"), new String[0]);
        dataStream.to("echo");
        boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty("test.run_fk_join", "false"));
        if (runFkJoin) {
            try {
                KTable fkTable = builder.table("fk", Consumed.with(SmokeTestUtil.intSerde, SmokeTestUtil.stringSerde));
                StreamsUpgradeTest.buildFKTable((KTable<String, Integer>)dataTable, (KTable<Integer, String>)fkTable);
            }
            catch (Exception e) {
                System.err.println("Caught " + e.getMessage());
            }
        }
        Properties config = new Properties();
        config.setProperty("application.id", "StreamsUpgradeTest-" + new Random().nextLong());
        config.put("commit.interval.ms", (Object)1000L);
        config.putAll((Map<?, ?>)streamsProperties);
        KafkaStreams streams = new KafkaStreams(builder.build(), config);
        streams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            streams.close();
            System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
            System.out.flush();
        }));
    }

    private static void buildFKTable(KTable<String, Integer> primaryTable, KTable<Integer, String> otherTable) {
        KStream kStream = primaryTable.join(otherTable, v -> v, (k0, v0) -> v0).toStream();
        kStream.process(StreamsUpgradeTest.printProcessorSupplier("fk"), new String[0]);
        kStream.to("fk-result", Produced.with(SmokeTestUtil.stringSerde, SmokeTestUtil.stringSerde));
    }

    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier(final String topic) {
        return () -> new AbstractProcessor<K, V>(){
            private int numRecordsProcessed = 0;

            public void init(ProcessorContext context) {
                System.out.println("[2.5] initializing processor: topic=" + topic + " taskId=" + context.taskId());
                this.numRecordsProcessed = 0;
            }

            public void process(K key, V value) {
                ++this.numRecordsProcessed;
                if (this.numRecordsProcessed % 100 == 0) {
                    System.out.println("processed " + this.numRecordsProcessed + " records from topic=" + topic);
                }
            }

            public void close() {
            }
        };
    }
}

