본문 바로가기
iOS/Combine

[Combine] 스케줄링을 위한 subscribe(on:)과 receive(on:)의 원리 (with OpenCombine)

by 바등쪼 2024. 1. 16.

Combine 탐구 시리즈

1. Publisher와 Subscriber 그리고 Subscription(with OpenCombine)
2. Cancellable 탐구 (with OpenCombine)

3. Combine과 Backpressure (with OpenCombine, RxSwift)

 

 

오늘은 OpenCombine을 기반으로 Combine에 대해 Deep Dive 하는 4번째 글입니다!

많은 분들이 사용하고 있지만 정확히 어떻게 동작하는지 잘 모르는 스케줄러 지정 연산자(Sepecifying Schedulers)가 주제입니다.

이번 글은 Combine의 구독 생성 과정에 대한 사전 지식이 필요합니다.
지난 글의 내용이 사전 지식으로 필요하니 참고 부탁드립니다!!

 

GPT에게 iOS Task 스케줄링 관련된 이미지를 만들어 달라고 했는데 이런 이미지를 그려주네요...?ㅋㅋㅋ

 

 

Specifying Schedulers Operator

Combine에는 2가지의 스케줄러 지정 연산자가 존재합니다.

subscribe(on: options:)과 receive(on: options:)입니다.

 

두 오퍼레이터(연산자)에 대해 각각 알아보기 전에 두 연산자의 공통 파라미터 타입인 Scheduler에 대해 잠깐 짚고 넘어가겠습니다.

 

Scheduler 프로토콜

공식문서를 보면 클로저를 실행하는 시기와 방법을 정의하는 프로토콜이라고 설명하고 있습니다.

protocol Scheduler<SchedulerTimeType>

 

스케줄러에 대한 설명에 Thread에 관한 설명이 없는 것을 확인할 수 있는데 왜냐하면 Scheduler는 Thread와 동일하지 않기 때문입니다.

Scheduler != Thread

 

왜냐하면 Scheduler 프로토콜을 채택한 구현체의 세부 구현 사항에 따라 어떤 스레드에서 작업이 진행될지 달라지기 때문입니다.

즉, Scheduler를 통해 작업 진행 스레드가 조정될 수 있지만 Scheduler 자체가 Thread와 동의어는 아니라는 것입니다.

 

Scheduler 프로토콜이 제약하는 scheduler 메서드의 구현 사항에 따라 파라미터로 받는 클로저의 실행 시기와 방법이 정해지며 스레드가 그 방법의 일부일 수 있습니다.

 

다만 일반적으로 Scheduler를 사용할 때는 스레드를 지정하기 위한 목적이 다수이기 때문에 이 글에서는 스케줄러 사용과 스레드 지정을 어느 정도 동일하게 간주하고 분석을 하겠습니다..!

 

 

subscribe(on: options:)

공식문서에 따르면 subscribe, cancel, request operations를 수행할 스케줄러를 지정하는 연산자라고 소개하고 있습니다.

func subscribe<S>(
    on scheduler: S,
    options: S.SchedulerOptions? = nil
) -> Publishers.SubscribeOn<Self, S> where S : Scheduler

 

 

지난 글에서 사용한 다이어그램을 다시 가져오겠습니다.

 

Combine에서 Publisher와 Subscriber의 구독 관계 생성 과정을 나타낸 다이어그램입니다.

짧게 정리해 보면

 

1. Publisher가 Subscriber를 subscribe(_:) 메서드를 통해 받음

2. subscribe(_:) 메서드는 receive(subscriber:) 메서드 호출

3. Publisher는 Subscription 구현체를 생성해서 Subscriber에게 전달

4. Subsriber는 Subscription의 request(_:) 메서드를 통해 값 전달 요청

5. Publisher는 값 생성 후 Subscription을 통해 전달

6. Subscription은 receive(_:) 메서드를 호출하여 값을 Subscriber에게 전달

7. Publisher가 completion 이벤트 전달

8. Subscriber가 completion 이벤트 수신

 

 

이러한 순서로 구독 관계가 형성되고 값을 전달하게 됩니다.

 

결론부터 말하자면 subscribe(on:)은 여기서 1, 2, 4번 그리고 cancel 작업이 지정한 스케줄러에서 스케줄링되도록 합니다!

 

OpenCombine의 코드를 분석하며 설명을 해보겠습니다!

 

subscribe(on: options:) 오퍼레이터

 

우리가 사용하는 subscribe(on: options:) 연산자는 다른 연산자들과 마찬가지로 Publishers 열거형의 확장에 구현되어 있습니다.

SubscribeOn이라는 Publisher 구현체를 생성하고 리턴하는 것을 확인할 수 있습니다.

생성자의 파라미터에는 연산자가 수신한 upstream, scheduler, options를 그대로 전달합니다.

 

SubscribeOn 퍼블리셔

 

이제 SubscribeOn 퍼블리셔의 구현체를 살펴보겠습니다.

 

생성자에서는 인풋으로 들어온 값들을 프로퍼티에 저장하고 있습니다.

 

밑의 receive(subscriber:) 메서드가 중요합니다!!

 

Inner라는 객체를 생성하고 이 객체를 upstream에 전달하여 구독을 생성하는 작업을 지정한 Scheduler의 schedule 메서드의 클로저에 넣고 있습니다.

 

여기서 Inner는 Subscription 프로토콜의 구현체입니다.

즉, Subscription을 업스트림에 전달하는 작업의 스케줄링을 하게 됩니다.

앞선 다이어그램의 1번 작업에 해당됩니다!

 

 

subscribe 메서드는 위의 코드처럼 receive(subscriber:)를 호출하여 퍼블리셔에게 구독자를 전달하게 됩니다.

이는 2번 작업 또한 지정한 스케줄러에서 스케줄링된다는 것을 알 수 있습니다. (다른 스케줄링 없이 하나의 컨텍스트에서 이어지는 메서드 호출이기 때문!)

 

Subscription 구현체 - Inner

 

Inner는 퍼블리셔와 Subscriber 사이에서 구독 관계를 조율하는 Subscription 객체입니다. (Inner라는 이름은 OpenCombine에서 지정한 네이밍이며 실제 Combine 구현체에서는 다를 수 있습니다.)

 

역시 생성자에서 스케줄러를 받아서 속성에 저장하고 있습니다.

 

Inner의 메서드 부분이 중요합니다.

 

Inner에서 scheduler 프로퍼티를 사용하는 건 이렇게 두 개의 함수뿐입니다.

 

앞서 구독 관계 형성 다이어그램에서 4번 작업에 해당되는 request(_ demand:) 메서드에서 스케줄러를 사용하고 있습니다.

 

또한 구독 관계 형성 다이어그램에서는 없었지만 구독을 취소하는 메서드인 cancel() 역시 지정한 스케줄러가 취소 작업을 스케줄링하도록 되어 있는 것을 확인할 수 있습니다.

 

반면 구독자에게 값을 전달하는 receive(_ input:) 메서드에는 스케줄러를 호출하지 않습니다.

subscribe(on: options:)는 값 전달에는 직접적으로 스케줄링에 관여하지 않는다는 것이죠!

 

다른 말로, receive(on: options:) 연산자는 이 메서드에서 스케줄러를 호출할 것으로 예상할 수 있습니다.

 

 

subscribe(on:)은 정말 다운스트림에는 아무 영향을 주지 않을까?

그렇지 않습니다.

 

앞서 Inner에서 스케줄러가 호출되는 메서드들을 확인했었는데, 물론 다운스트림에게 값을 전달하는 메서드인 receive(_ input:)에서는 스케줄러를 호출하지 않지만!

request(_ demand:) 에서는 스케줄러를 사용하고 있기 때문에 다운스트림에도 영향이 생길 수 있습니다.

 

Combine의 몇몇 Publisher는 request(_ demand:) 시점에 곧바로 값을 전달합니다.

 

이 Publisher들이 request(_ demand:) 시점에 값을 전달한다면, subscribe(on:)에 전달한 스케줄러에 영향을 받으며 다운스트림이 값을 수신하게 됩니다!

 

대표적으로 Just가 있습니다.

 

Just를 구독해서 실험을 해봤습니다.

 

첫 번째 구독은 별도의 스케줄링 연산자를 사용하지 않았고 두 번째 구독에서는 subscrbie(on:)을 사용해 작업이 글로벌 스레드에서 발생하도록 했습니다.

 

출력 결과를 보면 subscribe(on:)을 사용했을 때 다운스트림이 값을 수신한 스레드가 글로벌 스레드인 것을 확인할 수 있습니다.

 

Just의 Subscription에서 그 이유를 찾을 수 있습니다.

 

 

Just의 Subscription 구현체의 request(_ demand:) 구현부를 보면 subscriber가 값을 요청하는 즉시 downstream.receive(value)를 불러와 값을 구독자에게 전달하고 있습니다.

 

이는 곧바로 값을 전달한다는 Just의 특성과 부합합니다.

 

앞서 subscribe(on:)은 request(_ demand:)에는 스케줄러의 영향을 준다고 했는데 Just는 이 request 시점에 바로 다운스트림에 값을 전달하기 때문에 결과적으로 다운스트림에 스케줄링의 영향을 미치게 된 것입니다.

 

이와 동일하게 동작하는 Publisher로는 Publishers.Sequence가 있고 CurrentValueSubject의 첫 번째 값 역시 구독 관계 생성시 즉시 방출되기 때문에 마찬가지로 동작합니다.

CurrentValueSubject의 Subscription의 request 구현부

 

반면 구독자가 구독을 생성했을 시점이 아닌 비동기적으로 퍼블리셔가 값을 생산해서 방출하는 퍼블리셔인 Future, PassthroughSubject 등은 request 메서드 시점에는 전달할 값이 없기 때문에 스케줄러의 영향을 받지 않고 추후에 값이 생성되면 receive(input:) 메서드를 호출하여 다운스트림에 값을 전달하게 됩니다. 이 때는 receive(on: options:)의 스케줄러 지정에 영향을 받게 되는 것입니다!! 

 

(사실 더 정확하게 말하면, Future의 경우에는 Future의 fullFill 클로저에서 스레드 스위칭이 없다면 subscribe(on:)으로 지정한 스레드로 값 수신까지 이어지긴 합니다.)

 

PassthroughSubject의 Subscription 구현체인 Conduit 객체를 예시로 가져왔습니다.

여기서는 offer라는 메서드에서 downstream에 receive(_: input)으로 값을 전달하고 있습니다.

request 시점에는 값 전달을 하지 않습니다.

 

 

playground에서 실험을 해봤는데 역시 PassthroughSubject에서 subscribe(on:)을 사용해 글로벌 큐로 스위칭해도 값 수신 시에는 영향이 없는 것을 확인했습니다.

 

 

subscribe(on: options:) 정리

subscribe(on:)을 사용했다고 해서 다운스트림의 스레드에 영향을 주지 않으라는 보장은 없다!

어떤 Publisher냐에 따라 값 전달 시점이 달라지기 때문이다!

하지만 비동기적으로 값을 생성해서 전달하는 Publisher를 사용했다면 request 시점에 아닌 별도의 값 생성 시점에 전달하기 때문에 다운스트림 스레드에 영향을 주지 않는다.

 

 


 

receive(on: options:)

공식 문서에 따르면 퍼블리셔로부터 값을 수신할 스케줄러를 지정하는 연산자입니다.

func receive<S>(
    on scheduler: S,
    options: S.SchedulerOptions? = nil
) -> Publishers.ReceiveOn<Self, S> where S : Scheduler

 

다시 다이어그램을 보면, 6번 작업인 receive(_: input) 메서드의 실행 스케줄러를 설정할 수 있다는 의미입니다!

 

receive(on:)은 subscribe(on:)보다는 간단한 개념입니다.

 

다운스트림(구독자)가 값을 수신하게 되는 스레드를 지정할 수 있습니다.

 

 

OpenCombine 코드에서 그 원리를 확인해 보겠습니다!

 

receive(on: options:) 오퍼레이터

 

역시 Publisher 프로토콜의 extension에 구현되어 있습니다.

ReceiveOn이라는 Publisher 구현체를 생성하고 리턴합니다.

 

ReceiveOn 퍼블리셔 구현체

 

ReceiveOn 퍼블리셔는 Publisher 프로토콜을 채택하고 있기 때문에 receive(subscriber:) 메서드를 구현하고 있습니다.

 

구독자가 발생했을 때 구독권인 Subscription을 만들고 업스트림과 연결하는 역할을 수행합니다.

SubscribeOn과 마찬가지로 Inner라는 이름으로 Subscription을 구현하고 있습니다.

 

Subscription 구현체 - Inner

 

SubscribeOn의 Inner와 매우 유사합니다.

scheduler와 관련 옵션을 받아서 속성에 저장합니다.

 

차이점이 있다면 이 scheduler를 호출하는 메서드가 다릅니다.

 

SubscribeOn의 Inner 구현체에서는 request와 cancel 메서드에서 스케줄러를 호출했다면 receiveOn의 Inner 구현체에서는 receive(_ input:)과 receive(completion:) 에서 스케줄러를 호출합니다.

 

 

이로 인해 다운스트림(구독자)가 값을 수신할 때와 completion event를 수신할 때의 스케줄러를 지정할 수 있게 됩니다.

receive(on:)을 사용한다면 subscribe(on:) 과 달리 모든 퍼블리셔가 구독자에게 값을 전달할 때 receive(_ input:) 메서드를 지나쳐야 하기 때문에 예외 없이 지정한 스케줄러에서 값 수신이 스케줄링됩니다.

 

 

 

Just 퍼블리셔에 subscribe(on: DispatchQueue.global())과 receive(on: DispatchQueue.main) 순서로 선언을 해서 실행을 시켜보면 메인 스레드에서 값을 수신한 것을 확인할 수 있습니다.

 

시퀀스 다이어그램으로 나타내면 다음과 같습니다.

주황색은 글로벌 스레드, 보라색은 메인 스레드로 표현했습니다.

 

Just는 값 방출 후에 같은 컨텍스트에서 completion 이벤트 또한 전달하기 때문에 receive(on:)을 거치기 전까지 completion 이벤트도 글로벌 스레드에서 진행됩니다.

(completion 이벤트 방출은 퍼블리셔마다 시점이 다르기 때문에 Just는 이렇구나 정도만 알고 넘어가주세요!)

 

 

전체 스케줄링 과정 요약 정리

1. Just에서 request 시점에 3을 바로 방출

2. SubscribeOn의 Inner가 3을 수신한 스레드는 글로벌 스레드

3. SubscribeOn의 Inner는 다운스트림인 receiveOn의 Inner에게 3을 전달 (receive(_ : input) 호출)

4. ReceiveOn의 Inner는 receive(_: input) 메서드에서 자신이 가진 스케줄러로 스케줄링을 하기 때문에 이 예제에서는 메인 스레드로 스케줄링

5. 최종적으로 sink 구문에서 값을 수신할 때는 메인 스레드에서 스케줄링되어 값을 수신

 

 

이러한 과정으로 스케줄링이 연쇄적으로 일어나게 됩니다.

 

 

만약 Just가 아니라 PassthrouthSubject라면 어떨까요!?

 

이렇게 Just와는 다른 부분이 생깁니다!

 

값 또는 종료 이벤트를 send 할 때의 스레드를 붉은색으로 표현했습니다.

 

request 전달 시점이 아닌 값이 발생한 시점의 스레드에 영향을 받아 업스트림의 진행 스레드가 구성됩니다.

 

물론 마지막에 receive(on:)을 거치기 때문에 최종적으로 Sink가 값을 받게 되는 스레드는 메인 스레드가 됩니다.

 

 

 

마무리

오늘은 Combine의 스케줄링 연산자에 대해 알아봤습니다.

 

스레드 관리와 관련된 부분이다 보니 CS적인 지식도 필요하고 Combine의 전체적인 동작 원리에 대한 사전 지식 또한 필요했습니다.

 

이전 시간에 이어서 쭉 OpenCombine을 기반으로 코드를 분석하며 깊이 있게 학습해 왔기 때문에 오늘 내용을 이해할 수 있었던 것 같습니다.

 

물론 제가 작성한 내용에도 오류가 있을 수 있으니 질문이나 피드백은 언제나 환영합니다!!! 😄

 

다음에도 더 유익한 글로 찾아오겠습니다!!

댓글