본문 바로가기
iOS/Combine

[Combine] Publisher와 Subscriber 그리고 Subscription (with OpenCombine)

by 바등쪼 2023. 11. 15.

최근 Combine 스터디를 진행하면서 단순히 Combine의 사용법을 넘어 코드 레벨에서의 학습에 대한 흥미가 생겼습니다.

 

마침 스터디를 같이 하고 있는 분이 좋은 자료를 알려주셨는데 바로 OpenCombine입니다.

애플에서는 기본적으로 Combine의 구현 코드를 공개하고 있지 않습니다.

따라서, 개발자들은 내부적인 동작 원리를 유추하거나 RxSwift의 코드를 참고해서 파악할 수 밖에 없었는데 마침 해외의 어떤 개발자분이 Combine을 직접 구현해서 공개를 한 레포지토리가 있었습니다.

https://github.com/OpenCombine/OpenCombine

 

GitHub - OpenCombine/OpenCombine: Open source implementation of Apple's Combine framework for processing values over time.

Open source implementation of Apple's Combine framework for processing values over time. - GitHub - OpenCombine/OpenCombine: Open source implementation of Apple's Combine framework for proc...

github.com

 

Star 수도 많고 꾸준히 업데이트를 하고 있는 것 같아서 앞으로는 이 OpenCombine 코드 분석을 바탕으로 Combine을 공부한 내용들을 업로드 할 예정입니다.

 

 

Publisher와 Subscriber 그리고 Subscription

WWDC1, WWDC2 

 

지난 2개의 Combine 관련 WWDC를 보면서 Combine의 기초적인 부분들을 한 번 짚고 넘어갔습니다.

 

Publisher와 Subscriber의 관계는 매우 기초적이면서도 Combine의 핵심을 관통하는 것이기 때문에 이번 기회에 더 깊게 알아보도록 하겠습니다.

 

추가로 Publisher와 Subscriber 사이에서 구독 관계를 조율하는 Subscription에 대해서도 알아보겠습니다.

 

 

Publisher

https://developer.apple.com/documentation/combine/publisher

 

Publisher | Apple Developer Documentation

Declares that a type can transmit a sequence of values over time.

developer.apple.com

  • 프로토콜입니다.
  • 하나 이상의 Subscriber들에게 시간 경과에 따라 일련의 값을 전송할 수 있는 타입의 요건을 정의합니다.
public protocol Publisher {
    associatedtype Output
    associatedtype Failure : Error

    func receive<S>(subscriber: S)
    where S: Subscriber,
          Self.Failure == S.Failure,
          Self.Output == S.Input
}

 

위와 같이 선언된 프로토콜이 Publisher입니다.

  • Output은 퍼블리셔가 생성할 값의 타입입니다.
  • Failure는 퍼블리셔가 생성할 에러의 타입입니다.
    • 에러를 생성하지 않는 publisher라면 Never로 지정합니다.
  • receive(subscriber: S)는 구독자를 받아 퍼블리셔에 연결하는 역할을 수행합니다. (subscription을 생성)
extension Publisher {
    public func subscribe<S>(_ subscriber: S)
    where S : Subscriber,
          Self.Failure == S.Failure,
          Self.Output == S.Input
}

 

  • 추가로 퍼블리셔에 구독자를 연결할 때 사용할 subscribe 함수도 extension을 통해 제공합니다.
  • 이 함수는 내부적으로 receive(subscriber: S)를 호출하게 됩니다.

 

 

Subscriber

https://developer.apple.com/documentation/combine/subscriber

 

Subscriber | Apple Developer Documentation

A protocol that declares a type that can receive input from a publisher.

developer.apple.com

  • 역시 프로토콜입니다.
  • 퍼블리셔로부터 값을 받을 수 있도록 요구 사항을 정의합니다.
  • 대표적인 Subscriber 구현체로 Sink와 Assign이 있습니다.
public protocol Subscriber: CustomCombineIdentifierConvertible {
    associatedtype Input
    associatedtype Failure: Error
    
    func receive(subscription: Subscription)
    func receive(_ input: Self.Input) -> Subscribers.Demand
    func receive(completion: Subscribers.Completion<Self.Failure>)
}

 

위와 같이 선언된 프로토콜이 Subscriber입니다.

  • Intput은 퍼블리셔의 Output과 대응되는 타입입니다. 즉, Subscriber가 수신할 값의 타입입니다.
  • Failure는 역시 Subscriber가 받을 수 있는 에러 타입이며 에러를 받지 않는 상황에서는 Never로 지정합니다.
  • receive(subscription:)은 퍼블리셔가 생성한 Subscription을 Subscriber가 수신할 때 호출되는 함수입니다.
  • receive(_ input:)은 퍼블리셔가 생성한 값을 Subscriber가 수신할 때 호출되는 함수입니다.
    • Demand를 반환합니다.
    • 여기서 리턴하는 Demand는 합 연산으로 적용됩니다.
      • 즉, 3번 더 값을 수신할 기회가 잇었는데 Demand를 .max(2)로 리턴하면 3+2 = 5가 되어 수신 기회가 5번으로 늘어나게 됩니다!
  • receive(completion:)은 퍼블리셔가 value 생성을 완료했음을 Subscriber에게 전달할 때 호출되는 함수입니다.

 

Subscription

public protocol Subscription: Cancellable, CustomCombineIdentifierConvertible {
    func request(_ demand: Subscribers.Demand)
}

 

위와 같이 선언된 프로토콜이 Subscription 입니다.

  • Combine에서 Publisher와 Subscriber 사이의 연결은 Subscription으로 표현됩니다.
  • reqeust(_ demand:)는 Subscriber가 수신할 수 있는 초기 횟수를 지정하는 함수입니다.
  • 물론, Subscriber의 receive(_ input:) -> Subscribers.Demand 함수를 통해 동적으로 수신 횟수를 조절 할 수 있지만 최초 구독 관계 생성 시 정해지는 횟수는 이 함수를 통해 설정됩니다.

 

 

이제 기본적인 프로토콜들을 선언부를 확인했으니 각각의 구현체들이 어떻게 동작하는지 살펴보겠습니다.

 

 

패턴

 

WWDC에서 나온 그림입니다.

 

  1. Subscriber가 Publisher를 구독 (subscribe 수행)
  2. Publisher는 Subscriber에게 Subscription을 생성해서 receive(subscription:) 을 통해 제공
  3. Subscriber는 Publisher에게 request(_:) 을 통해 값을 요청
  4. Publisher는 Subscriber에게 값을 생성해서 receive(_: Input) 을 통해 제공
  5. 4번의 반복
  6. Publisher는 값 생성이 종료되면 receive(completion:)을 호출하여 Subscriber에게 종료 이벤트 전달

 

정확히 이 흐름으로 모든 Publisher와 Subscriber가 관계를 형성합니다.

애플은 이러한 패턴에 필요한 함수들을 Protocol로 명시하고 추상화한 것이죠!

 

사실 여기까지만 봐도 이해가 잘 될 수도 있지만 코드 레벨로 넘어가서 한번 이 패턴을 직접 확인해보겠습니다!!

위의 패턴 순서를 잘 기억해주세요!!

(OpenCombine의 코드입니다.)

 

Just

가장 단순한 Publisher인 Just를 예시로 구독이 생성되는 과정을 따라가 보겠습니다.

Just는 구독자들에게 Output을 한번 내보낸 다음에 종료되는 단순한 Publisher입니다. (공식문서)

구독 관계 형성을 파악하기에 적합하다고 생각하여 가져왔습니다.

 

Just는 struct이기 때문에 value type입니다.

역시 Publisher 프로토콜을 채택하고 있습니다.

 

Output은 제네릭을 통해 정적 다형성을 사용하고 있습니다.

Just는 Failure로 Never를 강제하고 있습니다.

 

receive 함수가 중요한데 Inner라는 것을 만들어 subscriber에게 전달합니다.

 

바로 이 Inner가 Subscription 입니다.

 

(Inner라는 것은 OpenCombine 개발자가 정한 이름이기 때문에 실제 애플의 Combine 구현부에서는 다른 이름일 수 있습니다.)

 

Just에 Nested Type으로 선언되어 있습니다. 

역시나 Subscription 프로토콜을 채택합니다.

 

생성자에서는 퍼블리셔가 방출하는 value와 downstream(구독자)를 받아 멤버 프로퍼티에 저장합니다.

 

중요한 부분은 request(_ demand:) 입니다.

 

subscriber는 이 함수를 호출하여 퍼블리셔에게 값을 요청하게 됩니다.

실제로 downstream(구독자)에게 receive(value)를 호출하여 바로 값을 전달하고 있습니다.

 

이 부분은 Just의 특성 때문입니다.

한 번만 값을 즉시 전달하고 종료되는 퍼블리셔이기 때문에 request가 들어오자 마자 다운스트림에게 값을 전달하고 completion 이벤트 또한 전달한 것이죠!

다른 퍼블리셔의 경우 파라미터로 받은 demand 만큼 수신 기회를 설정하고 퍼블리셔에게 값 전달을 지시하는 역할을 수행합니다.

 

 

자 이제 다시 정리해보면 Just는 Pubisher 프로토콜을 준수하고 Ouput과 Failure 타입을 가집니다.

receive(subscriber:) 함수를 구현하여 구독자를 추가할 수 있도록 합니다.

이 receive 함수에서는 Subscription을 만들어서 구독자에게 전달합니다.

Subscription 은 Inner 라는 이름으로 구현되어 있으며 request(_ demand:)를 구현하고 있습니다.

 

 

이제 구독자가 이 Just 퍼블리셔를 구독하는 과정을 살펴보겠습니다.

 

Sink

대표적인 Subscriber입니다. (공식 문서)

Just를 구독하는 과정에 앞서 구독할 구독자인 Sink를 알아봅시다!

 

Sink는 class로 구현되어 있으며 reference type임을 의미합니다.

(구독자 객체는 복사가 발생하면 안되기 때문이라고 생각합니다.)

 

역시나 Subscriber 프로토콜을 채택합니다.

 

Input과 Failure 타입 또한 제네릭으로 받게 됩니다.

 

 

Sink는 개발자가 퍼블리셔를 편하게 구독하고 Value 또는 Completion 이벤트를 수신했을 때 처리할 액션을 쉽게 지정하기 위해 만들어진 Subscriber입니다.

따라서 생성자에서 2개의 클로저를 받아 각각 value를 수신할 때, Completion 이벤트를 수신할 때 수행할 수 있도록 변수에 저장하고 있습니다.

 

 

 

프로토콜에서 명시한대로 subscription을 받는 receive(subscription:) 함수를 구현하고 있습니다.

멀티 스레드 환경에서의 경쟁 상황을 피하기 위해 lock을 걸어서 상태를 변경합니다.

이 시간에 알아야 하는 부분은 가장 아랫줄의 subscription.request(.unlimited)입니다. 

Subscription의 request(_: Demand)를 호출하여 이 구독자가 값을 수신하고 싶어 하는 횟수를 무제한으로 설정한 것입니다.

(물론 Just를 구독하게 되면 1회만 값을 방출하고 completion 을 보내기 때문에 의미가 없지만 다른 퍼블리셔를 구독할 때는 무제한으로 값을 받고 싶을 때 Sink를 사용하면 됩니다.)

 

 

 

물론 값을 수신하기 위한 receive(_ value) 또한 구현하고 있었습니다.

값을 수신하면 생성자에서 받아온 클로저인 receiveValue(_:)를 실행하여 개발자가 지정한 액션이 발생하도록 하고 있습니다.

 

return .none을 하는 이유는 애초에 Subscription에게 수신 횟수를 unlimited로 요청했기 때문에 수신 기회를 더 늘릴 필요가 없기 때문입니다.

 

 

마지막으로 completion 이벤트를 수신하기 위한 receive(completion)을 구현하고 있습니다.

역시나 생성자에서 파라미터로 받아온 receiveCompletion(_:)을 호출하여 개발자가 지정한 액션을 발생시킵니다.

 

 

구독 형성

 

Combine에서는 Publisher의 extension으로 기본으로 제공하는 Subscriber들을 쉽게 사용할 수 있도록 제공하고 있습니다.

Subscriber 구현체와 동일한 이름의 함수로 제공됩니다.

 

위 코드를 보면 생성한 Just 퍼블리셔에 sink 함수를 호출하면 Sink 구독자를 생성하게 됩니다.

그리고 subscribe 함수를 호출하고 AnyCancellable로 감싼 subscriber를 리턴합니다.

AnyCancellable과 관련된 내용은 따로 정리해서 글을 쓸 예정이니 이번에는 넘어가도록 하겠습니다.

 

subscribe 함수를 보면 앞서 Publisher Protocol을 설명했을 때 언급했던 것처럼

Publisher의 extension에 구현되어 있었습니다.

 

엄청 길고 복잡해 보이지만 우리에게 중요한건 마지막 줄에 있는 receive(subscriber: subscriber)입니다.

 

즉, 제일 처음에 설명했던 것처럼 퍼블리셔에게 구독자를 연결시킨 것입니다.

 

 

이제 모든 연결 관계에 대한 퍼즐은 맞추어졌습니다.

다시 한 번 정리해보겠습니다.

 

정리

let cancellable = Just<Int>(7).sink { completionEvent in
    print(completionEvent)
} receiveValue: { value in
    print(value)
}

 

Just를 사용하여 퍼블리셔를 생성하고 sink를 통해 구독을 한 예시입니다.

실제로 Combine을 사용할 때는 이렇게 사용하는 법만 알아도 되지만 우리가 이렇게 코드를 작성하면 어떤 일이 발생하는지 지금까지 알아봤습니다.

다시 순서대로 따라가봅시다!

1. Publisher의 생성

  • Just는 Publisher 프로토콜을 채택한 퍼블리셔입니다.
  • 단일 값을 받아서 즉시 방출하기 때문에 위 예시에서는 7이라는 테스트용 숫자를 넣었습니다.
  • Publisher 프로토콜에 따라 receive(subscriber:)를 구현하고 있습니다.

 

2. Subscriber의 생성

  • Publisher 프로토콜의 extension에 sink 함수가 구현되어 있습니다.
  • 위 예시 코드에서는 이 함수를 호출하여 Sink 구독자를 생성한 것입니다.
  • sink 함수에서는 Sink 객체(구독자)를 만들고 퍼블리셔의 subscribe 함수에 파라미터로 넣어서 호출합니다.
  • subscribe 함수는 Publisher 프로토콜의 extension으로 기본구현되어 있습니다.
  • subscribe 함수는 내부적으로 receive(subscriber:)를 호출합니다.
  • 즉, 앞서 만든 Sink 객체를 Just에게 전달한 것입니다!

 

3. Subscription의 생성

  • 그럼 이제 Just의 receive(subscriber:) 함수가 실행되겠죠?
  • receive(subscriber:)에서는 Inner라는 이름의 Subscription을 생성합니다.
  • subscriber.receive(subscription:)을 통해 이 Subscription을 다시 subscriber에게 전달합니다. 

 

4. Sink의 receive(subscription:)의 실행

  • Subscription을 수신한 Sink는 subscription.request(.unlimited)를 실행하여 값 수신 횟수를 무제한으로 설정합니다. (Sink 구독자의 특성)
  • 이 예시에서는 Inner의 request 함수를 실행한 것입니다.

 

 

5. Inner의 request(_ demand:)의 실행

  • Just는 요청이 들어오면 값을 바로 방출하기 때문에 이 함수는 바로 value를 방출합니다.
 _ = downstream.receive(value)

 

 

6. Sink의 receive(_: Intput)의 실행

  • Just로부터 값을 받은 Sink는 자신이 가지고 있는 클로저를 실행하여 개발자가 지정한 액션을 수행합니다.
  • 정확히는 Just로부터 직접 받은 값이 아닌 Subscription인 Inner로 부터 받은 값입니다.
  • 이렇듯 Combine에서는 Publisher가 값을 직접 Subscriber에게 주는 것이 아닌 Subscription을 통해서 전달하게 됩니다.

 

7. Inner의 request(_ demand:)의 실행

  • 아직 request의 일이 끝나지 않았습니다.
  • Just는 값을 1회 방출하고 바로 종료하기 때문에 downstream.receive(value)를 호출한 즉시 아래의 코드가 실행되게 됩니다.
downstream.receive(completion: .finished)
  • 구독자에게 종료 이벤트를 보낸 것입니다.

 

8. Sink의 receive(completion:)의 실행

  • 이제 마지막 단계입니다.
  • 구독(스트림)이 종료되었기 때문에 개발자가 Sink 생성자에 넣은 클로저를 실행합니다.
  • 이제 구독 관계가 최종적으로 끝나게 됩니다.

 

 

 

Just가 아닌 다른 Publisher들, 특히 비동기적으로 값을 생성해서 방출하는 퍼블리셔를 구독할 때는 다음과 같은 패턴이 발생합니다.

Subject에 관련된 내용들은 다른 글에서 더 깊은 내용으로 포스팅하겠습니다!

 

 

 

마무리

Combine에서 가장 중요한 개념인 Publisher, Subscriber, Subscription에 대해 코드 레벨에서 더 깊게 알아봤습니다.

각각의 객체가 어떻게 상호작용하고 동작하는지에 대해 이해도를 높일 수 있었습니다.

 

OpenCombine 덕분에 깊이 있는 학습을 할 수 있게 된 것 같아서 좋네요!

댓글