Search
Duplicate
📒

[Spring MSA] xx-3. Kafka 3

상태
미진행
수업
Spring MSA
주제
기본개념
4 more properties
참고

실습 프로젝트

NOTE
@Configuration @EnableKafka @EnableKafkaStreams public class KafkaConfiguration { @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration kStreamsConfigs() { Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); // flush interval . default 30000 return new KafkaStreamsConfiguration(props); } }
Java
복사
@Component public class StreamListener { @Bean public KStream<String, String> kStream(StreamsBuilder builder) { final String inputTopic = "checkout.complete.v1"; final String outputTopic = "checkout.productId.aggregated.v1"; KStream<String, String> inputStream = builder.stream(inputTopic); inputStream .map((k, v) -> new KeyValue<>(JsonUtils.getProductId(v), JsonUtils.getAmount(v))) // Group by productId .groupByKey(Grouped.with(Serdes.Long(), Serdes.Long())) // Window 설정 .windowedBy(TimeWindows.of(Duration.ofMinutes(1))) // Apply sum method .reduce(Long::sum) // map the window key .toStream((key, value) -> key.key()) // outputTopic 에 보낼 Json String 으로 Generate .mapValues(JsonUtils::getSendingJson) // outputTopic 으로 보낼 key 값을 null 설정 .selectKey((key, value) -> null) // outputTopic 으로 메세지(null, jsonString) 전송 설정 .to(outputTopic, Produced.with(null, Serdes.String())); return inputStream; } }
Java
복사