/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.time.Instant;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;

public class SmokeTestUtil {
    static final int END = Integer.MAX_VALUE;
    public static Serde<String> stringSerde = Serdes.String();
    public static Serde<Integer> intSerde = Serdes.Integer();
    static Serde<Long> longSerde = Serdes.Long();
    static Serde<Double> doubleSerde = Serdes.Double();

    static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(String topic) {
        return SmokeTestUtil.printProcessorSupplier(topic, "");
    }

    static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic, final String name) {
        return () -> new ContextualProcessor<Object, Object, Void, Void>(){
            private int numRecordsProcessed = 0;
            private long smallestOffset = Long.MAX_VALUE;
            private long largestOffset = Long.MIN_VALUE;

            public void init(ProcessorContext<Void, Void> context) {
                super.init(context);
                System.out.println("[3.3] initializing processor: topic=" + topic + " taskId=" + String.valueOf(context.taskId()));
                System.out.flush();
                this.numRecordsProcessed = 0;
                this.smallestOffset = Long.MAX_VALUE;
                this.largestOffset = Long.MIN_VALUE;
            }

            public void process(Record<Object, Object> record) {
                ++this.numRecordsProcessed;
                if (this.numRecordsProcessed % 100 == 0) {
                    System.out.printf("%s: %s%n", name, Instant.now());
                    System.out.println("processed " + this.numRecordsProcessed + " records from topic=" + topic);
                }
                this.context().recordMetadata().ifPresent(recordMetadata -> {
                    if (this.smallestOffset > recordMetadata.offset()) {
                        this.smallestOffset = recordMetadata.offset();
                    }
                    if (this.largestOffset < recordMetadata.offset()) {
                        this.largestOffset = recordMetadata.offset();
                    }
                });
            }

            public void close() {
                System.out.printf("Close processor for task %s%n", this.context().taskId());
                System.out.println("processed " + this.numRecordsProcessed + " records");
                long processed = this.largestOffset >= this.smallestOffset ? 1L + this.largestOffset - this.smallestOffset : 0L;
                System.out.println("offset " + this.smallestOffset + " to " + this.largestOffset + " -> processed " + processed);
                System.out.flush();
            }
        };
    }

    public static void sleep(long duration) {
        try {
            Thread.sleep(duration);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    public static class Agg {
        KeyValueMapper<String, Long, KeyValue<String, Long>> selector() {
            return (key, value) -> new KeyValue((Object)(value == null ? null : Long.toString(value)), (Object)1L);
        }

        public Initializer<Long> init() {
            return () -> 0L;
        }

        Aggregator<String, Long, Long> adder() {
            return (aggKey, value, aggregate) -> aggregate + value;
        }

        Aggregator<String, Long, Long> remover() {
            return (aggKey, value, aggregate) -> aggregate - value;
        }
    }

    public static final class Unwindow<K, V>
    implements KeyValueMapper<Windowed<K>, V, K> {
        public K apply(Windowed<K> winKey, V value) {
            return (K)winKey.key();
        }
    }
}

