Reactor 실전 패턴 2
개념 요약
- Cold(디폴트): 구독이 들어올 때 비로소 실행되며, 구독자마다 소스가 새로 실행됩니다. (= per-subscriber)
- Hot: 구독과 무관하게 생산(또는 첫 구독 이후 공유)되며, 여러 구독자가 같은 생산을 공유합니다. 늦게 합류하면 앞부분을 놓칠 수 있습니다.
비교 표
| 항목 | Cold | Hot |
|---|---|---|
| 실행 시점 | 구독 시 시작(lazy) | 소스가 이미 진행 중이거나, 첫 구독 시 시작해서 공유 |
| 구독자별 데이터 | 각자 처음부터 재생 | 합류 시점 이후만 수신(기본) |
| 중복 작업 | 구독 수만큼 반복 수행 가능 | 1회 수행을 여러 구독자와 공유 |
| 실패/완료 전파 | 각 구독자별로 독립 | 공유 스트림이면 전 구독자에 전파 |
| 대표 소스 | just, range, fromIterable, defer, fromCallable 등 |
Sinks, publish/share, 외부 push(Kafka/WebSocket), interval(공유화 시) |
| 재생(리플레이) | 기본 전체 재생(새 실행) | 기본 없음 → replay(n)/cache(n) 필요 |
| 사용 예 | 요청-응답, DB/API 호출 | 실시간 브로드캐스트, 알림/시그널, 자원 공유 최적화 |
최소 예제
Cold (디폴트)
Flux<Integer> cold = Flux.range(1, 3).delayElements(Duration.ofMillis(200));
cold.subscribe(v -> System.out.println("A: " + v)); // 1,2,3
Thread.sleep(250);
cold.subscribe(v -> System.out.println("B: " + v)); // 1,2,3 (A와 독립 실행)
Hot — publish()/autoConnect()
Flux<Long> hot = Flux.interval(Duration.ofMillis(200))
.take(6)
.publish()
.autoConnect(1); // 첫 구독 발생 시 시작
hot.subscribe(v -> System.out.println("A: " + v)); // A는 0,1,2,3,4,5 모두 받음(시점에 따라)
Thread.sleep(500); // 0.0, 0.2, 0.4 즈음까지 진행
hot.subscribe(v -> System.out.println("B: " + v)); // B는 0.6 이후 값만 받게 됨
Thread.sleep(1200); // <-- 남은 0.6,0.8,1.0s 방출을 기다려줌
Hot — Sinks (실시간 push)
참고자료, 전송 중 연결 제어 (흐름 제어 + 혼잡 제어)
흐름 제어 (Flow Control)
- 수신 측의 처리 속도에 맞춰 송신 속도를 조절하는 기능.
윈도우 크기(Window size) 를 사용:
- 수신 버퍼가 가득 차면 윈도우 크기를 줄여서 송신 측이 전송을 멈추도록 함.
- 즉, “내가 지금 받을 수 있는 양은 이 정도야” 하고 알려줌 → 송신 측이 조절.
혼잡 제어 (Congestion Control)
- 네트워크(중간 경로)가 혼잡하지 않도록 송신량을 조절하는 기능.
대표적인 알고리즘:
- Slow Start: 처음엔 전송량을 조금씩 늘리며 테스트.
- Congestion Avoidance: 혼잡 신호가 없으면 조금씩 전송량 증가.
- Fast Retransmit / Fast Recovery: 패킷 손실 시 빠르게 재전송하고, 윈도우 크기 줄여서 회복.
개념적으로 아주 비슷해요. 다만:
- TCP: 커널/네트워크 레벨에서 수신 윈도우를 광고해 송신이 조절됨
- Reactor: 사용자 공간에서 request(n) 로 하류(consumer) → 상류(producer)로 수요가 전파됨
즉, “속도는 소비자가 정한다”는 점이 동일합니다.
1) Sinks 기반: 안전한 핫 스트림 + backpressure 정책
Sinks.many()는 핫 퍼블리셔를 만들 때 표준이에요. 핵심은 어떤 backpressure 정책을 쓸지 선택하는 것.
1-1. Multicast + 버퍼 (여러 구독자, 늦게 온 데이터는 공유, 버퍼 한도)
Sinks.Many<String> sink =
Sinks.many().multicast().onBackpressureBuffer(
1024, // 버퍼 한도
dropped -> log.warn("dropped: {}", dropped), // overflow hook
Sinks.EmitFailureHandler.FAIL_FAST // 빠르게 실패(호출측에서 재시도/백오프)
);
Flux<String> hot = sink.asFlux();
hot
.onBackpressureBuffer(256) // 하류가 느릴 때 추가 버퍼(선택)
.publishOn(Schedulers.boundedElastic()) // 소비 스케줄러 (예시)
.subscribe(this::handleSlowConsumer);
- 유사 TCP: 수신 윈도우가 작을 때 대기열에 쌓아두는 것과 비슷
- 한도를 명시해 메모리 폭주를 방지하고, overflow 시 드롭/실패/후속 조치를 결정
1-2. Multicast + Drop/Latest (손실 허용 실시간 방송형)
Sinks.Many<byte[]> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<byte[]> hotLossy = sink.asFlux()
.onBackpressureDrop(b -> metrics.incrementDropped()) // 또는
//.onBackpressureLatest()
;
hotLossy.subscribe(this::renderFrame);
- 프레임/시세처럼 최신만 중요한 경우 TCP의 혼잡 시 드롭과 유사한 전략
1-3. Unicast + 버퍼 (단일 소비자, 강한 backpressure 전파)
Sinks.Many<Event> uni = Sinks.many().unicast().onBackpressureBuffer();
Flux<Event> pipe = uni.asFlux(); // 단일 구독자만 허용
pipe.subscribe(this::consumeSafely); // 소비 속도가 곧 전체 속도 = 강한 제어
- 생산자가 여러 명이어도 소비자는 1명 → “한 줄로” 흐름 제어하기 좋은 구조
2) Flux.create / Flux.push: request(n) 기반 Pull/Push 조절
외부 소스(파일, TCP, SDK 콜백 등)를 붙일 땐, 요청량을 보고 생산량을 조절해야 진짜 “TCP 같은” 컨트롤이 됩니다.
2-1. Flux.create + onRequest: 요청량만큼만 생산 (정석)
Flux<byte[]> stream = Flux.create(sink -> {
AtomicBoolean cancelled = new AtomicBoolean();
// downstream가 얼만큼 가져갈지 알려줄 때만 생산
sink.onRequest(n -> {
for (long i = 0; i < n && !cancelled.get(); i++) {
byte[] chunk = readNextChunk(); // 블로킹이면 별도 스케줄러 사용
if (chunk == null) {
sink.complete();
return;
}
sink.next(chunk);
}
});
sink.onCancel(() -> cancelled.set(true));
}, FluxSink.OverflowStrategy.ERROR); // 요청 없이 next()하면 에러
- 핵심:
onRequest(n)에서만next()→ 명시적 흐름 제어 - TCP에서 수신 윈도우만큼만 전송하는 것과 1:1 대응
2-2. Flux.push: 단일 생산자, 콜백 빠른 전달(간단)
Flux<Event> events = Flux.push(sink -> {
listener.onEvent(e -> {
if (!sink.isCancelled()) sink.next(e);
});
listener.onClose(sink::complete);
});
- 단순하지만 요청량 체크는 직접 해야 함. 과잉 푸시를 막으려면
sink.requestedFromDownstream()확인/슬립/드롭 로직 추가 필요.
3) Reactor Netty(TCP)와 backpressure
Reactor Netty는 TCP 소켓을 Reactive Streams와 브릿지합니다.
즉, outbound/inbound가 요청량에 맞춰 전송/수신을 조절해 줍니다.
3-1. 서버에서 “받기” (수신 측 backpressure)
TcpServer.create()
.handle((in, out) -> {
// in.receive(): Flux<ByteBuf> — 하류 요청량에 맞춰서 upstream read 조절
Flux<String> lines = in.receive()
.asString()
.windowUntil(s -> s.endsWith("\n")) // 예: 라인 단위 처리
.concatMap(w -> w.reduce(new StringBuilder(), StringBuilder::append).map(StringBuilder::toString));
return out.sendString(lines.map(this::processLine)); // 처리 후 응답
})
.bindNow()
.onDispose()
.block();
in.receive()는 다운스트림이 요청할 때만 채널 read를 진행 → 자연스러운 backpressure- Netty의 auto-read/채널 파이프라인과 맞물려 폭주를 방지
3-2. 서버에서 “보내기” (송신 측 backpressure)
Flux<ByteBuf> payload = source
.map(bytes -> Unpooled.wrappedBuffer(bytes))
.onBackpressureBuffer(512, buf -> ReferenceCountUtil.release(buf)); // 누수 방지
TcpServer.create()
.handle((in, out) -> out.send(payload)) // out.send가 downstream(wire) 요청에 맞춰 pull
.bindNow();
out.send(flux)는 네트워크가 보낼 수 있을 때만 pull → 과도한 버퍼링 방지- 버퍼 한도/해제(ReferenceCountUtil.release) 등 리소스 관리가 중요
4) 어떤 전략을 언제 쓰나
| 상황 | 추천 |
|---|---|
| 콜백/센서/SDK 이벤트를 핫 스트림으로 | Sinks.many().multicast().onBackpressureBuffer(limit, …) |
| 단일 소비자에게만 강한 backpressure | Sinks.many().unicast().onBackpressureBuffer() |
| 요청량만큼 생산(진짜 TCP 같은 흐름) | Flux.create + onRequest(n) |
| 최신만 유지(드롭 허용) | .onBackpressureDrop() 또는 .onBackpressureLatest() |
| 최근 N개 재생 | replay().limit(n) 또는 cache(n) |
| Reactor Netty TCP | in.receive()/out.send(flux) 조합 (자연 backpressure) |
5) 실패/혼잡 시의 운영 팁
- 버퍼 한도는 필수: 무제한 버퍼는 언젠가 OOM.
- Overflow 처리: 드롭/실패/백오프/재시도 중 정책을 정하고 메트릭 발행.
- 모니터링:
requestedFromDownstream(), 드롭 카운트, 대기열 길이, 처리 지연을 기록. - Blocking 격리: 블로킹 소스는
boundedElastic등에서 감싸고create.onRequest로 “요청량만큼 pull”.
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<String> hotFlux = sink.asFlux();
hotFlux.subscribe(v -> System.out.println("A: " + v));
sink.tryEmitNext("x1"); sink.tryEmitNext("x2");
hotFlux.subscribe(v -> System.out.println("B: " + v)); // x1,x2 놓침
sink.tryEmitNext("x3");
Hot + Replay (최근 N 또는 전체 재생)
Flux<Long> replayAll = Flux.interval(Duration.ofMillis(150))
.take(5)
.cache(); // 완료 후 재구독자에게 0..4 모두 재생
Flux<Long> replay2 = Flux.interval(Duration.ofMillis(150))
.take(5)
.cache(2); // 최근 2개만 재생
언제 무엇을 쓰나
-
Cold(기본)
- 각 요청/구독마다 독립 실행이 맞는 상황: DB 쿼리, 외부 API 호출, 파일 읽기 등.
- “매번 신선한” 데이터를 원하거나 구독자 간 간섭을 원치 않을 때.
-
Hot
- 실시간 브로드캐스트: 시세, 센서, 알림, 채팅, 서버 이벤트(SSE), WebSocket.
- 자원 공유 최적화: 동일 계산/호출을 구독자마다 반복하고 싶지 않을 때(캐시/공유).
- 외부 Push: Kafka, RabbitMQ, Redis Pub/Sub 등에서 들어오는 스트림을 여러 소비자에 공유.
변환/패턴
Cold → Hot (공유)
publish().autoConnect(n): n개의 구독이 모이면 시작(공유).share(): 간편 공유(내부적으로 refCount 계열).publish().refCount(n): n개 구독 유지, 0명이면 upstream cancel.replay(n)/cache(n): 핫 공유 + 최근 N개 재생. 완료 후 전체 재생은cache().
Flux<Data> shared = heavyColdSource() // DB/API 등
.publish()
.refCount(1); // 최소 1명 있을 때만 실행/공유
Hot에 “과거 재생” 붙이기
- 최근만 필요:
replay(n)/cache(n) - TTL 캐싱:
Mono<T>.cache(Duration ttl)/Flux<T>.cache(int history, Duration ttl)
Mono<User> memoized = loadUserOnce(userId)
.cache(Duration.ofMinutes(5)); // 5분 메모이즈(여러 구독 공유, TTL 이후 갱신)
외부 소스 → Hot
Sinks.many().multicast(): 최신만 브로드캐스트(이전 미제공).Sinks.many().replay().limit(n): 최근 n개 리플레이.Sinks.many().unicast(): 단일 소비자 패턴.
주의사항 & 베스트 프랙티스
-
디폴트는 Cold 무심코
share()/publish()를 붙여 Hot으로 바꾸면, 다운스트림 구독/해제 타이밍에 따라 데이터 손실/중복이 생길 수 있어요. 의도적일 때만. -
리소스 생명주기 Hot 스트림은 언제 시작/중단할지(접속 수, 애플리케이션 라이프사이클)를 결정해야 합니다.
- 시작:
autoConnect(n)/ 수동connect() - 중단:
refCount(n)로 무구독 시 upstream cancel, 또는 명시적 dispose.
- 시작:
-
백프레셔
Sinks사용 시onBackpressureBuffer/onBackpressureDrop등 정책을 명확히.- 무한 버퍼는 메모리 리스크. 경계/한도/드롭 로깅 설계 필수.
-
오류 전파 공유 Hot 스트림이 에러로 종료되면, 모든 구독자가 함께 종료됩니다. 재시도/복구는 상위에서 정책화(
retryWhen, 재구성)하세요. -
테스트 가독성 Cold는 재구독이 새 실행이라 테스트가 안정적. Hot은 시점 의존이 커서 StepVerifier에 가상 시간(
withVirtualTime)이나 replay/cache를 혼합하면 읽기 쉬워집니다.
WebFlux 컨트롤러 예시 (SSE)
Cold SSE: 각 클라이언트가 1..5를 처음부터 받음
@GetMapping(value="/sse/cold", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Integer> coldSse() {
return Flux.range(1, 5).delayElements(Duration.ofMillis(300));
}
Hot SSE: 서버 전역 스트림 공유(늦게 합류 시 이전 놓침)
private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
private final Flux<String> hotFlux = sink.asFlux();
@GetMapping(value="/sse/hot", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> hotSse() {
return hotFlux; // 모든 클라이언트가 같은 실시간 소스 공유
}
// 예시: 1초마다 서버가 이벤트 푸시
@PostConstruct
void startTicks() {
Flux.interval(Duration.ofSeconds(1))
.map(i -> "tick-" + i)
.subscribe(v -> sink.tryEmitNext(v));
}