/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;

public class StreamsUpgradeTest {
    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
        }
        String propFileName = args[0];
        Properties streamsProperties = Utils.loadProps((String)propFileName);
        System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.2)");
        System.out.println("props=" + streamsProperties);
        KStreamBuilder builder = new KStreamBuilder();
        KStream dataStream = builder.stream(new String[]{"data"});
        dataStream.process(StreamsUpgradeTest.printProcessorSupplier(), new String[0]);
        dataStream.to("echo");
        Properties config = new Properties();
        config.setProperty("application.id", "StreamsUpgradeTest");
        config.put("commit.interval.ms", (Object)1000);
        config.putAll((Map<?, ?>)streamsProperties);
        final KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, config);
        streams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                streams.close();
                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
                System.out.flush();
            }
        });
    }

    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
        return new ProcessorSupplier<K, V>(){

            public Processor<K, V> get() {
                return new AbstractProcessor<K, V>(){
                    private int numRecordsProcessed = 0;

                    public void init(ProcessorContext context) {
                        System.out.println("[0.10.2] initializing processor: topic=data taskId=" + context.taskId());
                        this.numRecordsProcessed = 0;
                    }

                    public void process(K key, V value) {
                        ++this.numRecordsProcessed;
                        if (this.numRecordsProcessed % 100 == 0) {
                            System.out.println("processed " + this.numRecordsProcessed + " records from topic=data");
                        }
                    }

                    public void punctuate(long timestamp) {
                    }

                    public void close() {
                    }
                };
            }
        };
    }
}

