개념 요약

  • Cold(디폴트): 구독이 들어올 때 비로소 실행되며, 구독자마다 소스가 새로 실행됩니다. (= per-subscriber)
  • Hot: 구독과 무관하게 생산(또는 첫 구독 이후 공유)되며, 여러 구독자가 같은 생산을 공유합니다. 늦게 합류하면 앞부분을 놓칠 수 있습니다.

비교 표

항목 Cold Hot
실행 시점 구독 시 시작(lazy) 소스가 이미 진행 중이거나, 첫 구독 시 시작해서 공유
구독자별 데이터 각자 처음부터 재생 합류 시점 이후만 수신(기본)
중복 작업 구독 수만큼 반복 수행 가능 1회 수행을 여러 구독자와 공유
실패/완료 전파 각 구독자별로 독립 공유 스트림이면 전 구독자에 전파
대표 소스 just, range, fromIterable, defer, fromCallable Sinks, publish/share, 외부 push(Kafka/WebSocket), interval(공유화 시)
재생(리플레이) 기본 전체 재생(새 실행) 기본 없음 → replay(n)/cache(n) 필요
사용 예 요청-응답, DB/API 호출 실시간 브로드캐스트, 알림/시그널, 자원 공유 최적화

최소 예제

Cold (디폴트)

Flux<Integer> cold = Flux.range(1, 3).delayElements(Duration.ofMillis(200));

cold.subscribe(v -> System.out.println("A: " + v)); // 1,2,3
Thread.sleep(250);
cold.subscribe(v -> System.out.println("B: " + v)); // 1,2,3 (A와 독립 실행)

Hot — publish()/autoConnect()

Flux<Long> hot = Flux.interval(Duration.ofMillis(200))
        .take(6)
        .publish()
        .autoConnect(1); // 첫 구독 발생 시 시작

hot.subscribe(v -> System.out.println("A: " + v)); // A는 0,1,2,3,4,5 모두 받음(시점에 따라)
Thread.sleep(500);                                  // 0.0, 0.2, 0.4 즈음까지 진행
hot.subscribe(v -> System.out.println("B: " + v)); // B는 0.6 이후 값만 받게 됨

Thread.sleep(1200); // <-- 남은 0.6,0.8,1.0s 방출을 기다려줌

Hot — Sinks (실시간 push)

참고자료, 전송 중 연결 제어 (흐름 제어 + 혼잡 제어)

흐름 제어 (Flow Control)

  • 수신 측의 처리 속도에 맞춰 송신 속도를 조절하는 기능.
  • 윈도우 크기(Window size) 를 사용:

  • 수신 버퍼가 가득 차면 윈도우 크기를 줄여서 송신 측이 전송을 멈추도록 함.
  • 즉, “내가 지금 받을 수 있는 양은 이 정도야” 하고 알려줌 → 송신 측이 조절.

혼잡 제어 (Congestion Control)

  • 네트워크(중간 경로)가 혼잡하지 않도록 송신량을 조절하는 기능.
  • 대표적인 알고리즘:

  • Slow Start: 처음엔 전송량을 조금씩 늘리며 테스트.
  • Congestion Avoidance: 혼잡 신호가 없으면 조금씩 전송량 증가.
  • Fast Retransmit / Fast Recovery: 패킷 손실 시 빠르게 재전송하고, 윈도우 크기 줄여서 회복.

개념적으로 아주 비슷해요. 다만:

  • TCP: 커널/네트워크 레벨에서 수신 윈도우를 광고해 송신이 조절됨
  • Reactor: 사용자 공간에서 request(n) 로 하류(consumer) → 상류(producer)로 수요가 전파됨

즉, “속도는 소비자가 정한다”는 점이 동일합니다.


1) Sinks 기반: 안전한 핫 스트림 + backpressure 정책

Sinks.many()는 핫 퍼블리셔를 만들 때 표준이에요. 핵심은 어떤 backpressure 정책을 쓸지 선택하는 것.

1-1. Multicast + 버퍼 (여러 구독자, 늦게 온 데이터는 공유, 버퍼 한도)

Sinks.Many<String> sink =
    Sinks.many().multicast().onBackpressureBuffer(
        1024,                              // 버퍼 한도
        dropped -> log.warn("dropped: {}", dropped), // overflow hook
        Sinks.EmitFailureHandler.FAIL_FAST // 빠르게 실패(호출측에서 재시도/백오프)
    );

Flux<String> hot = sink.asFlux();

hot
  .onBackpressureBuffer(256) // 하류가 느릴 때 추가 버퍼(선택)
  .publishOn(Schedulers.boundedElastic()) // 소비 스케줄러 (예시)
  .subscribe(this::handleSlowConsumer);
  • 유사 TCP: 수신 윈도우가 작을 때 대기열에 쌓아두는 것과 비슷
  • 한도를 명시해 메모리 폭주를 방지하고, overflow 시 드롭/실패/후속 조치를 결정

1-2. Multicast + Drop/Latest (손실 허용 실시간 방송형)

Sinks.Many<byte[]> sink = Sinks.many().multicast().onBackpressureBuffer();

Flux<byte[]> hotLossy = sink.asFlux()
    .onBackpressureDrop(b -> metrics.incrementDropped())   // 또는
    //.onBackpressureLatest()
    ;

hotLossy.subscribe(this::renderFrame);
  • 프레임/시세처럼 최신만 중요한 경우 TCP의 혼잡 시 드롭과 유사한 전략

1-3. Unicast + 버퍼 (단일 소비자, 강한 backpressure 전파)

Sinks.Many<Event> uni = Sinks.many().unicast().onBackpressureBuffer();
Flux<Event> pipe = uni.asFlux(); // 단일 구독자만 허용

pipe.subscribe(this::consumeSafely); // 소비 속도가 곧 전체 속도 = 강한 제어
  • 생산자가 여러 명이어도 소비자는 1명 → “한 줄로” 흐름 제어하기 좋은 구조

2) Flux.create / Flux.push: request(n) 기반 Pull/Push 조절

외부 소스(파일, TCP, SDK 콜백 등)를 붙일 땐, 요청량을 보고 생산량을 조절해야 진짜 “TCP 같은” 컨트롤이 됩니다.

2-1. Flux.create + onRequest: 요청량만큼만 생산 (정석)

Flux<byte[]> stream = Flux.create(sink -> {
    AtomicBoolean cancelled = new AtomicBoolean();

    // downstream가 얼만큼 가져갈지 알려줄 때만 생산
    sink.onRequest(n -> {
        for (long i = 0; i < n && !cancelled.get(); i++) {
            byte[] chunk = readNextChunk(); // 블로킹이면 별도 스케줄러 사용
            if (chunk == null) {
                sink.complete();
                return;
            }
            sink.next(chunk);
        }
    });

    sink.onCancel(() -> cancelled.set(true));
}, FluxSink.OverflowStrategy.ERROR); // 요청 없이 next()하면 에러
  • 핵심: onRequest(n)에서만 next()명시적 흐름 제어
  • TCP에서 수신 윈도우만큼만 전송하는 것과 1:1 대응

2-2. Flux.push: 단일 생산자, 콜백 빠른 전달(간단)

Flux<Event> events = Flux.push(sink -> {
    listener.onEvent(e -> {
        if (!sink.isCancelled()) sink.next(e);
    });
    listener.onClose(sink::complete);
});
  • 단순하지만 요청량 체크는 직접 해야 함. 과잉 푸시를 막으려면 sink.requestedFromDownstream() 확인/슬립/드롭 로직 추가 필요.

3) Reactor Netty(TCP)와 backpressure

Reactor Netty는 TCP 소켓을 Reactive Streams와 브릿지합니다. 즉, outbound/inbound요청량에 맞춰 전송/수신을 조절해 줍니다.

3-1. 서버에서 “받기” (수신 측 backpressure)

TcpServer.create()
    .handle((in, out) -> {
        // in.receive(): Flux<ByteBuf> — 하류 요청량에 맞춰서 upstream read 조절
        Flux<String> lines = in.receive()
            .asString()
            .windowUntil(s -> s.endsWith("\n")) // 예: 라인 단위 처리
            .concatMap(w -> w.reduce(new StringBuilder(), StringBuilder::append).map(StringBuilder::toString));

        return out.sendString(lines.map(this::processLine)); // 처리 후 응답
    })
    .bindNow()
    .onDispose()
    .block();
  • in.receive()는 다운스트림이 요청할 때만 채널 read를 진행 → 자연스러운 backpressure
  • Netty의 auto-read/채널 파이프라인과 맞물려 폭주를 방지

3-2. 서버에서 “보내기” (송신 측 backpressure)

Flux<ByteBuf> payload = source
    .map(bytes -> Unpooled.wrappedBuffer(bytes))
    .onBackpressureBuffer(512, buf -> ReferenceCountUtil.release(buf)); // 누수 방지

TcpServer.create()
    .handle((in, out) -> out.send(payload)) // out.send가 downstream(wire) 요청에 맞춰 pull
    .bindNow();
  • out.send(flux)는 네트워크가 보낼 수 있을 때만 pull → 과도한 버퍼링 방지
  • 버퍼 한도/해제(ReferenceCountUtil.release) 등 리소스 관리가 중요

4) 어떤 전략을 언제 쓰나

상황 추천
콜백/센서/SDK 이벤트를 핫 스트림으로 Sinks.many().multicast().onBackpressureBuffer(limit, …)
단일 소비자에게만 강한 backpressure Sinks.many().unicast().onBackpressureBuffer()
요청량만큼 생산(진짜 TCP 같은 흐름) Flux.create + onRequest(n)
최신만 유지(드롭 허용) .onBackpressureDrop() 또는 .onBackpressureLatest()
최근 N개 재생 replay().limit(n) 또는 cache(n)
Reactor Netty TCP in.receive()/out.send(flux) 조합 (자연 backpressure)

5) 실패/혼잡 시의 운영 팁

  • 버퍼 한도는 필수: 무제한 버퍼는 언젠가 OOM.
  • Overflow 처리: 드롭/실패/백오프/재시도 중 정책을 정하고 메트릭 발행.
  • 모니터링: requestedFromDownstream(), 드롭 카운트, 대기열 길이, 처리 지연을 기록.
  • Blocking 격리: 블로킹 소스는 boundedElastic 등에서 감싸고 create.onRequest로 “요청량만큼 pull”.

Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<String> hotFlux = sink.asFlux();

hotFlux.subscribe(v -> System.out.println("A: " + v));
sink.tryEmitNext("x1"); sink.tryEmitNext("x2");

hotFlux.subscribe(v -> System.out.println("B: " + v)); // x1,x2 놓침
sink.tryEmitNext("x3");

Hot + Replay (최근 N 또는 전체 재생)

Flux<Long> replayAll = Flux.interval(Duration.ofMillis(150))
    .take(5)
    .cache();     // 완료 후 재구독자에게 0..4 모두 재생

Flux<Long> replay2 = Flux.interval(Duration.ofMillis(150))
    .take(5)
    .cache(2);    // 최근 2개만 재생

언제 무엇을 쓰나

  • Cold(기본)

    • 각 요청/구독마다 독립 실행이 맞는 상황: DB 쿼리, 외부 API 호출, 파일 읽기 등.
    • “매번 신선한” 데이터를 원하거나 구독자 간 간섭을 원치 않을 때.
  • Hot

    1. 실시간 브로드캐스트: 시세, 센서, 알림, 채팅, 서버 이벤트(SSE), WebSocket.
    2. 자원 공유 최적화: 동일 계산/호출을 구독자마다 반복하고 싶지 않을 때(캐시/공유).
    3. 외부 Push: Kafka, RabbitMQ, Redis Pub/Sub 등에서 들어오는 스트림을 여러 소비자에 공유.

변환/패턴

Cold → Hot (공유)

  • publish().autoConnect(n): n개의 구독이 모이면 시작(공유).
  • share(): 간편 공유(내부적으로 refCount 계열).
  • publish().refCount(n): n개 구독 유지, 0명이면 upstream cancel.
  • replay(n) / cache(n): 핫 공유 + 최근 N개 재생. 완료 후 전체 재생은 cache().
Flux<Data> shared = heavyColdSource()    // DB/API 등
    .publish()
    .refCount(1);                        // 최소 1명 있을 때만 실행/공유

Hot에 “과거 재생” 붙이기

  • 최근만 필요: replay(n)/cache(n)
  • TTL 캐싱: Mono<T>.cache(Duration ttl) / Flux<T>.cache(int history, Duration ttl)
Mono<User> memoized = loadUserOnce(userId)
    .cache(Duration.ofMinutes(5)); // 5분 메모이즈(여러 구독 공유, TTL 이후 갱신)

외부 소스 → Hot

  • Sinks.many().multicast() : 최신만 브로드캐스트(이전 미제공).
  • Sinks.many().replay().limit(n) : 최근 n개 리플레이.
  • Sinks.many().unicast() : 단일 소비자 패턴.

주의사항 & 베스트 프랙티스

  1. 디폴트는 Cold 무심코 share()/publish()를 붙여 Hot으로 바꾸면, 다운스트림 구독/해제 타이밍에 따라 데이터 손실/중복이 생길 수 있어요. 의도적일 때만.

  2. 리소스 생명주기 Hot 스트림은 언제 시작/중단할지(접속 수, 애플리케이션 라이프사이클)를 결정해야 합니다.

    • 시작: autoConnect(n) / 수동 connect()
    • 중단: refCount(n)로 무구독 시 upstream cancel, 또는 명시적 dispose.
  3. 백프레셔

    • Sinks 사용 시 onBackpressureBuffer/onBackpressureDrop 등 정책을 명확히.
    • 무한 버퍼는 메모리 리스크. 경계/한도/드롭 로깅 설계 필수.
  4. 오류 전파 공유 Hot 스트림이 에러로 종료되면, 모든 구독자가 함께 종료됩니다. 재시도/복구는 상위에서 정책화(retryWhen, 재구성)하세요.

  5. 테스트 가독성 Cold는 재구독이 새 실행이라 테스트가 안정적. Hot은 시점 의존이 커서 StepVerifier에 가상 시간(withVirtualTime)이나 replay/cache를 혼합하면 읽기 쉬워집니다.

WebFlux 컨트롤러 예시 (SSE)

Cold SSE: 각 클라이언트가 1..5를 처음부터 받음

@GetMapping(value="/sse/cold", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Integer> coldSse() {
  return Flux.range(1, 5).delayElements(Duration.ofMillis(300));
}

Hot SSE: 서버 전역 스트림 공유(늦게 합류 시 이전 놓침)

private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
private final Flux<String> hotFlux = sink.asFlux();

@GetMapping(value="/sse/hot", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> hotSse() {
  return hotFlux; // 모든 클라이언트가 같은 실시간 소스 공유
}

// 예시: 1초마다 서버가 이벤트 푸시
@PostConstruct
void startTicks() {
  Flux.interval(Duration.ofSeconds(1))
      .map(i -> "tick-" + i)
      .subscribe(v -> sink.tryEmitNext(v));
}