Combine 탐구 시리즈
1. Publisher와 Subscriber 그리고 Subscription(with OpenCombine)
2. Cancellable 탐구 (with OpenCombine)
오늘은 OpenCombine을 기반으로 Combine의 Backpressure 전략에 대해 알아보도록 하겠습니다!!
추가로 RxSwift의 Backpressure와의 차이점에 대해서도 살펴보겠습니다.
Backpressure란?
Backpressure라는 단어 자체를 처음 들어본 분들도 꽤 있을 것 같습니다.
저 또한 익숙한 단어는 아니었는데 사전적 의미는 다음과 같습니다.
"기체를 배출하는 쪽의 압력"이라고 용어 사전에 적혀있습니다.
배압 또는 역압이라고도 하며 흐름 방향의 반대 압력을 막연하게 배압(backpressure)라고 하는 경우도 있다고 합니다.
무엇인지 감이 오시나요?ㅎㅎ
갑자기 기체, 흐름, 압력 이런 이야기가 나와서 물리 시간이 된 것 같지만 사실 Combine에서도 유사한 개념이 사용됩니다.
그 이유는 스트림(Stream)이 Combine과 Reactive Programming의 핵심 개념이기 때문입니다.
반응형 프로그래밍(Reactive Programming)을 하다 보면 Stream과 Pipeline에 대한 이야기를 계속 하게 됩니다.
반응형 프로그래밍 자체가 데이터를 스트림으로 만들어 주고 받는 선언적 프로그래밍 패러다임이기 때문입니다.
위 그림처럼 스트림을 생성하여 두 스트림을 선언적으로 연결해두면 시간의 흐름에 따라 생기는 데이터들이 스트림을 따라 전달되는 구조입니다.
Backpressure의 개념을 여기에 도입해보면 Downstream에서 Upstream으로 가해지는 압력이 Backpressure에 해당됩니다.
즉, 일반적인 상황에서의 Stream에서는 Upstream에서 Downstream으로 데이터가 흐르지만 반대로 Backpressure의 경우에는 역할이 뒤집혀 Downstream이 Upstream에 영향을 줄 수 있는 것입니다.
Backpressure 자체의 사전적 의미는 역압을 의미하지만 소프트웨어의 세계에서는 "Backpressure를 제어하거나 처리할 수 있는 능력이 있다"라는 뜻으로 사용되기도 합니다. 실제로 다른 자료들에서도 Combine에서 backpressure를 이야기할 때 이러한 제어 능력 자체를 의미하는 경우가 많았습니다. (참고)
따라서 이번 글에서도 두 가지 의미로 혼용되어 사용될 수 있음을 유의하며 읽어주세요!
자 이제 그럼 왜 Backpressure가 중요한지 사례를 살펴봅시다.
Downstream은 Upstream으로부터 데이터를 받아서 다음과 같은 작업들을 처리한다고 상황을 가정해보겠습니다.
- 대용량 파일의 전송
- 복잡한 UI의 렌더링
- 사용자 입력 대기
공통적으로 리소스가 크게 소요되는 것들입니다.
이러한 상황에서 Upstream은 매우 짧은 주기로 Downstream에게 데이터를 무자비하게 전달한다면 어떻게 될까요?
여기서 Backpressure의 개념이 등장합니다.
Combine에서 Backpressure의 압력(저항)은 Publisher가 내보내는 값을 Subscriber가 처리하는 데 필요한 시간으로 생각할 수 있습니다.
이러한 처리 시간이 커지면 Publisher(Upstream)가 공급하는 양이 Subscriber(=Downstream)가 소비하는 양보다 훨씬 커지게 되고 오버플로우와 같은 문제가 생길 수 있습니다. (Fast producer와 Slow Consumer 관계)
즉, 공급량을 제어할 필요가 생깁니다.
Combine에서는 이러한 공급량 제어를 Demand라는 개념을 활용하여 컨트롤합니다.
➡️ Combine의 backpressure 전략
이전 글에서 Demand에 대해 잠깐 소개했지만 해당 개념에 대해 다시 정리해보겠습니다!
Demand 란?
정확히는 Subscribers.Demand 입니다.
Demand는 구독을 통해 구독자로부터 퍼블리셔에게 요청된 항목의 수입니다.
즉, 구독자가 퍼블리셔에게 요청한 값 요청 횟수를 의미합니다.
이 부분이 Combine의 특징인데 구독자가 생산자(Publisher)에게 직접 자신이 몇 개의 값을 받고 싶은지 역으로 지정할 수 있다는 독특한 점입니다.
코드로 살펴보겠습니다.
final class CustomSubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never
func receive(subscription: Subscription) {
subscription.request(.max(2))
}
func receive(_ input: Int) -> Subscribers.Demand {
print("값 수신", input)
return .none
}
func receive(completion: Subscribers.Completion<Never>) {
print("종료 이벤트", completion)
}
}
Int를 수신하는 간단한 Custom Subscriber를 생성했습니다.
역시 Subscriber 프로토콜을 채택하여 3개의 receive 함수를 구현해야 합니다.
여기서 첫 번째 receive 함수인 receive(subscription:)을 보면 subscription에게 2개의 Demand를 request하는 것을 확인할 수 있습니다.
이제 이 Subscriber를 활용하여 구독을 발생시키겠습니다.
let publisher = PassthroughSubject<Int, Never>()
let subscriber = CustomSubscriber()
publisher.subscribe(subscriber)
publisher.send(1)
publisher.send(2)
publisher.send(3)
publisher.send(4)
publisher.send(5)
publisher.send(6)
PassthroughSubject를 활용해 publisher를 만들고 앞서 만든 CustomSubscriber를 구독시켰습니다.
그리고 1부터 6까지 6번 send를 하면 출력이 어떻게 될까요?
// 출력
값 수신 1
값 수신 2
이렇게 총 2번만 값을 수신하게 됩니다.
앞서 Subscriber의 receive(subscription:) 메서드에서 구독자를 초기 Demand를 2로 설정했기 때문입니다.
그럼 이 구독자는 평생 2번만 값을 수신할 수 있는걸까요?
그렇지는 않습니다.
Combine에서는 동적으로 Demand를 조정할 수 있는 기능을 제공합니다.
Subscriber 프로토콜을 채택하게 되면 receive(input:) 함수 또한 구현해야 하는데 이 함수의 리턴 타입이 바로 Demand입니다.
즉, 구독자가 값을 수신할 때마다 새롭게 자신이 받고 싶은 횟수를 업데이트할 수 있는 것입니다.
Subscriber의 receive(input:) 함수를 다음과 같이 수정해 보겠습니다.
func receive(_ input: Int) -> Subscribers.Demand {
print("값 수신", input)
switch input {
case 1:
return .max(2)
case 3:
return .max(1)
default:
return .none
}
}
1을 수신하게 되면 Demand를 2 증가시키고 3을 수신하게 되면 1을 증가시키는 코드입니다.
다른 값들이 왔을 때는 Demand에 변화를 주지 않습니다.
"Demand를 증가시킨다"라고 표현했는데 여기서 구현한 Demand는 합 연산으로 적용되기 때문입니다.
즉, 만약 2번 더 값을 수신할 기회가 있었는데 Demand를 .max(3)로 리턴하면 2+3 = 5가 되어 값 수신의 기회가 총 5회가 됩니다.
실행시켜보면
// 출력
값 수신 1
값 수신 2
값 수신 3
값 수신 4
값 수신 5
Demand가 동적으로 증가함에 따라 총 5개의 값을 수신한 것을 확인할 수 있습니다.
이렇게 구독자가 직접 요청한 만큼 값을 전달하는 매커니즘을 Pull-based Backpressure라고 합니다. (소비자가 생산자를 제어)
반대되는 개념으로 생산자가 데이터를 제어하고 available 할 때 소비자에게 전달하는 방식을 Push-based Backpressure라고 합니다.
여기까지 중간 정리를 해보겠습니다.
- 구독자가 퍼블리셔가 보내준 값을 처리하는 시간이 긴 경우가 있을 수 있다.
- 이 때 퍼블리셔가 짧은 주기로 값을 구독자에게 보내는 경우 오버플로우나 값 손실 등의 문제가 발생 할 수 있다.
- 이러한 문제를 Backpressure라고 한다.
- Backpressure 라는 단어는 그 자체로 Backpressure 문제를 제어할 수 있는 능력을 의미하기도 한다.
- Combine에서는 구독자가 직접 자신이 값을 수신할 횟수를 지정할 수 있도록 하는 Backpressure 전략을 사용한다.
- 이것을 Pull-based Backpressure라고 한다.
앞서 CustomSubscriber를 구현하여 Demand를 조절하는 과정을 살펴보았습니다.
그렇다면 Upstream에서는 어떻게 구독자의 Demand에 따라 값 전달을 결정할까요?
이 부분은 OpenCombine에서 코드 레벨로 확인해보겠습니다!
PassthroughSubject에서의 Demand
앞선 예제에서 사용한 PassthroughSubject가 자신의 구독자들에게 값을 전달할 때 Demand의 역할을 확인해보겠습니다.
PassthroughSubject의 구현부입니다.
Subject 프로토콜을 채택하고 있으며 이 Subject가 Publisher 프로토콜을 채택(상속)하고 있기 때문에 PassthroughSubject 또한 Publisher에 해당됩니다.
여기서 downstreams는 자신의 다운스트림들에 대한 정보를 담고 있는 자료구조입니다.
ConduitList라는 집합 기반의 자료구조이며 Subscription 구현체를 담게 됩니다.
이제 이 passthroughSubject에 send를 했을 때 어떤 일이 발생하는지 살펴 보겠습니다.
send 함수의 구현부입니다.
self.downstreams에 대해 forEach를 사용하여 모든 conduit에게 offer를 수행하고 있습니다.
여기서 conduit은 Subscription 구현체에 해당됩니다.
즉 downstream에 대한 정보를 가진 객체입니다.
downstream에게 데이터를 전달하는 구문인 것입니다!
이제 offer를 확인해보겠습니다.
PassthroughSubject의 Nested Type으로 구현된 Conduit 클래스의 메서드인 offer입니다.
앞서 학습했던 Demand Backpressure 전략이 수행되는 것을 확인할 수 있습니다!
순서대로 살펴 보면 다음과 같은 순서로 동작합니다.
1. 우선 lock을 걸고 Conduit(==Subscription)의 demand가 0보다 큰지 확인합니다.
2. 이제 이 Conduit이 가진 demand 기회를 1 줄이고 lock을 해제합니다.
3. downstream에게 receive(_ input:) 함수를 통해 값을 전달하고 리턴 값인 Demand를 newDemand 변수에 할당합니다.
4. newDemand가 0보다 크다면 다시 lock을 걸고 기존 demand에 더해줍니다. (합 연산!)
5. lock을 해제합니다.
예상했던대로 구현되어 있는 것을 확인할 수 있었습니다!!
Demand가 0보다 큰 다운스트림에만 값을 전달해주고 Subscriber가 새로 리턴한 Demand 값을 기존 demand에 더해주는 로직이었습니다. 이 과정에서 demand를 조절할 때 lock을 걸어 synchronize한 환경을 보장했습니다.
Combine에서의 다양한 Backpressure 전략
직접 확인해본 것처럼 Combine에서는 소비자(=구독자)가 생산자(=퍼블리셔)를 제어하여 값 수신 횟수를 조절하는 방식으로 과도하게 많은 생산물들이 소비자에게 전달되는 것을 방지하고 있습니다.
Combine에서 기본적으로 제공하는 Subscriber인 Sink와 Assign은 처음 구독 관계를 생성할 때 Demand를 .unlimted로 설정하기 때문에 값을 무제한으로 수신하게 됩니다.
Demand로 값 수신의 횟수를 구독자가 설정할 수 있지만 여기에는 약간의 빈틈이 있습니다.
방금 확인했던 것처럼 PassthroughSubject의 Conduit의 offer 메서드에서는 자신에게 값이 send되어 들어오면 이 값을 다운스트림들에게 전달하는데 이 때 demand가 0 이하인 다운스트림에는 값 자체를 전달하지 않게 됩니다.
처음 목표했던 것 처럼 구독자가 의도했던 것보다 많은 값을 수신하는 것 자체는 Demand 개념으로 막았지만 반대로 demand가 0 이하일 때는 구독자 입장에서는 값을 전혀 받지 못하는 일종의 손실이 발생하게 됩니다.
물론 이러한 문제를 해결하기 위해 Combine은 다양한 연산자를 지원하며 필요한 요구사항에 맞추어 개발자가 원하는 방식으로 구현하면 됩니다.
- 구독자가 처리할 수 있는 양보다 더 많은 값을 퍼블리셔가 보내지 못하도록 Demand 관리 (지금까지 공부한 방법)
- 새 값을 처리할 수 있을 때까지 Buffer 하기
- 즉시 처리할 수 없는 값은 Drop 하기
- 요구 사항에 맞추어 위의 방법들을 여러가지를 조합하여 사용하기
이러한 방식들과 연산자(Operator)에 대한 소개는 공식 문서를 참고해주세요!
RxSwift에서의 Backpressure
앞서 Combine은 Pull-based Backpressure를 사용한다고 했습니다.
그렇다면 Combine과 매우 유사한 비동기 처리 프레임워크인 RxSwift에서는 어떻게 처리하고 있는지 궁금해서 찾아봤습니다.
결론부터 말하면 RxSwift는 Push-based Backpressure를 사용합니다.
즉, 생산자가 데이터를 제어하고 available 할 때 소비자에게 전달하는 방식입니다.
Combine에서 사용하는 Demand과 같은 매커니즘이 없고 생산자는 별도의 제어 장치 없이 소비자에게 값을 전달하게 됩니다.
코드로 살펴보겠습니다.
Combine의 PassthroughSubject와 유사한 역할을 수행하는 RxSwift의 PublishSubject입니다.
OpenCombine의 PasshtourhSubject에서는 ConduitList라는 자료구조에 Downstream을 저장하고 있었지만 PublishSubject에서는 Observers라는 이름으로 typealias된 AnyObserver<Element>.s 에 Downstream을 담고 있습니다.
AnyObserver<Element>.s는 Bag이라는 이름의 구조체입니다.
Bag은 OpenCombine에서의 ConduitList과 유사한 역할을 하는 자료구조입니다.
Set 기반의 ConduitList와 달리 Dictionary 기반입니다.
다시 PublishSubject로 돌아와서 on(_ event: Event<Element>) 메서드를 살펴보겠습니다.
on(_ event: Event)은 Combine의 Subject에 있는 send(_ input:)과 유사한 역할을 합니다.
subject.on(.next(3))은 subject.send(3)과 같은 것입니다!
on 함수에서는 synchronzied_on(_ event:)를 호출합니다.
이 함수에서는 lock을 걸고 .next가 이벤트로 들어온 경우 self.observers를 리턴합니다. (자신이 가진 다운스트림들을 담은 자료구조를 리턴)
이제 여기서 리턴한 observers와 event를 파라미터로 넣어 dispatch 함수를 호출합니다.
dispatch 함수는 위와 같습니다.
Bag에 대해 반복문을 돌며 모든 Downstream에 event를 전달하고 있습니다.
즉, Combine과 달리 Downstream에는 어떠한 제어권이 없으며 Upstream이 값을 보내주는 대로 받아야 하는 구조인 것입니다.
이것은 RxSwift가 Push-based Backpressure임을 의미합니다.
RxSwift에서는 Demand 개념 대신 Backpressure를 제어하기 위해 몇가지 연산자를 제공합니다. (공식 문서)
- debounce
- throttle
- buffer
- sample
이러한 연산자들을 사용하여 Backpressure를 처리할 수 있습니다.
이 연산자들은 데이터 유실을 기저(Lossy Solutions)로 동작하기 때문에 Pull-based인 Combine에 비해 상대적으로 적은 선택지입니다.
이로 인해 상대적으로 Combine에 비해 Backpressure를 다루기 어려운 구조라고 생각됩니다.
번외 (네이밍)
지금까지 Combine에서의 Backpressure에 대해 살펴보았고 RxSwift와의 차이점 또한 확인했습니다.
이것과 관련된 이야기를 Combine 스터디원들과 하던 중에 나름 일리있고 흥미로운 이야기가 나왔는데 바로 네이밍과 관련된 이야기입니다.
Combine에서는 Publisher와 Subscriber로 구독 관계를 형성합니다.
RxSwift에서는 Observable과 Observer가 해당 역할을 수행합니다.
여기서 Publisher, Subscriber 그리고 Observable과 Observer..... 🧐
이렇게 이름을 지은 이유가 오늘 이야기한 Push-based, Pull-based와 관련된 것은 아닌지에 대한 추측이었습니다.
Observable은 말 그대로 "관찰 가능한"을 의미합니다.
Observer는 "관찰자"를 의미하며 Observable의 변화를 감지하고 이에 맞춰 적절한 처리를 수행합니다.
즉, 여기서 Observer는 Observable과 관계를 형성한 이후에는 수동적으로 Observable의 변화를 지켜보고 있는 상태를 내포한다고 생각할 수 있었습니다. 마치 차량에 설치된 블랙박스처럼요!
단어의 뉘앙스에 Observer에게는 제어권이 없다는 것을 담고 있는 것이 아닐가요?ㅎㅎ
반대로 Publisher는 발행자를 Subscriber는 구독자를 의미합니다.
신문 구독을 생각해봐도 발행자는 구독자가 있어야만 새로운 것을 발행할 수 있고 구독자의 의사에 따라 발행물을 받지 않을 수도 있습니다.
약간의 뉘앙스 차이지만 다운스트림이 값 수신에 어느 정도의 제어권이 있는지를 두 주체의 네이밍에서도 찾아볼 수 있지 않았나 가볍게 이야기를 나누어봤었습니다!
(이 내용은 전혀 오피셜한 것이 아니고 그저 스터디원들과 대화 중에 혹시? 해서 재미로 나온 이야기들이니 진지하게 받아들이지 않으시길 바랍니다!)
정리
- Pull-based Backpressure의 Combine
- Demand 개념으로 구현
- buffer, debounce, throttle, collect 등 추가적인 연산자 존재
- Push-based Backpressure의 RxSwift
- buffer, debounce, throttle, sample 등 추가적인 연산자 존재
마무리
지난 시간에 학습했던 Publisher, Subscriber, Subscription에서 등장했던 Demand에 대해 자세히 알아보고 이것이 Backpressure를 위해 도입된 것임을 살펴보았습니다.
OpenCombine의 코드를 확인하며 실제로 Demand가 구독 관계에서 어떤 역할로 동작하는지까지 확인하는 시간을 가졌습니다.
요즘 쉽지 않은 내용들이지만 깊게 공부하면서 점점 더 Combine의 철학에 가까워지고 있는 것 같아 성장하는 느낌이 드네요!
다음에도 더 재밌는 Combine 관련 글로 찾아오겠습니다!
'iOS > Combine' 카테고리의 다른 글
[Combine] 스케줄링을 위한 subscribe(on:)과 receive(on:)의 원리 (with OpenCombine) (0) | 2024.01.16 |
---|---|
[Combine] Cancellable 탐구 (with OpenCombine) (2) | 2023.11.22 |
[Combine] Publisher와 Subscriber 그리고 Subscription (with OpenCombine) (1) | 2023.11.15 |
Combine in Practice (0) | 2023.10.24 |
Introducing Combine (1) | 2023.10.23 |
댓글