/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;

public class StreamsUpgradeTest {
    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
        }
        String propFileName = args[0];
        Properties streamsProperties = Utils.loadProps((String)propFileName);
        System.out.println("StreamsTest instance started (StreamsUpgradeTest v3.0)");
        System.out.println("props=" + streamsProperties);
        StreamsBuilder builder = new StreamsBuilder();
        KStream dataStream = builder.stream("data");
        dataStream.process(StreamsUpgradeTest.printProcessorSupplier(), new String[0]);
        dataStream.to("echo");
        Properties config = new Properties();
        config.setProperty("application.id", "StreamsUpgradeTest");
        config.put("commit.interval.ms", (Object)1000);
        config.putAll((Map<?, ?>)streamsProperties);
        KafkaStreams streams = new KafkaStreams(builder.build(), config);
        streams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            streams.close();
            System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
            System.out.flush();
        }));
    }

    private static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> printProcessorSupplier() {
        return () -> new ContextualProcessor<KIn, VIn, KOut, VOut>(){
            private int numRecordsProcessed = 0;

            public void init(ProcessorContext<KOut, VOut> context) {
                System.out.println("[3.0] initializing processor: topic=data taskId=" + context.taskId());
                this.numRecordsProcessed = 0;
            }

            public void process(Record<KIn, VIn> record) {
                ++this.numRecordsProcessed;
                if (this.numRecordsProcessed % 100 == 0) {
                    System.out.println("processed " + this.numRecordsProcessed + " records from topic=data");
                }
            }

            public void close() {
            }
        };
    }
}

