본문 바로가기
iOS/RxSwift

Ch9. Combining Operators

by 헤콩 2021. 1. 13.
반응형

본 게시물은 Florent Pillet, Junior Bontognali, Marin Todorov, Scott Gardner - RxSwift.  Reactive Programming with Swift (2017, Razeware LLC) 책과 ReactiveX 사이트를 기반으로 작성되었습니다.

 

RxSwift를 공부하는 데에 이 책을 읽으면 좋겠다고 생각하였고, 단지 읽기만 하는 것보다 한국어로 직접 정리해놓으면 더 기억하기 좋을 것 같아 게시물을 작성하게 되었습니다.

이번 게시물에서는 마블 다이어그램과, 마블 다이어그램에 해당하는 메서드 예시를 중점으로 정리하였습니다.

 


이전 Chapter에서는 Observable Sequence를 어떻게 만들고, 필터링하고, 변형하는지를 보았습니다. 이번 챕터에서는 다양한 방법으로 Sequence들을 모으고, 각각의 Sequence 내의 데이터들을 합치는 방법에 대해서 배워볼 것입니다.

 

👉startWith, concat, concatMap, merge, combineLatest, zip, withLatestFrom, sample, amb, switchLatest, reduce, scan

 

A. Prefixing and concatenating

I. startWith

Observable로 작업할 때 가장 중요한 점은 Observer가 초기값을 받는지 여부를 확인하는 것입니다. 예를 들면 현재 위치나 네트워크 연결 상태처럼 현재 진행되는 상태가 필요한 상황이 있습니다. 이럴 때 현재 상태와 함께 초기값을 붙일 수 있습니다.

let numbers = Observable.of(2, 3, 4)

let observable = numbers.startWith(1)
observable.subscribe(
    onNext: { value in
        print(value)
    }
)

 

[ 출력 ]

1

2

3

4

 

 

II. concat

사실, startWith는 concat 연산자 계열을 단순하게 변형시켜놓은 것이라 할 수 있습니다. 위에서 본 startWith 예제는 하나의 값을 갖는 Sequence를 다른 Sequence에 연결한 것입니다. 우리는 concat을 통해서 여러 element를 가진 두 개의 sequence를 묶을 수 있습니다.

let first = Observable.of(1,2,3)
let second = Observable.of (4,5,6)

let observable = Observable.concat([first, second])
observable.subscribe(
    onNext: {
        print($0)
    }
)

 

[ 출력 ]

1

2

3

4

5

6

 

 

또한, 아래와 같이 concat을 사용할 수도 있습니다.

let germanCities = Observable.of("Berlin", "Munich", "Frankfurt")
let spanishCities = Observable.of("Madrid", "Barcelona", "Valencia")

let observable = germanCities.concat(spanishCities)
observable.subscribe(
    onNext: { value in
        print(value)
    }
)

 

[ 출력 ]

Berlin

Munich

Frankfurt

Madrid

Barcelona

Valencia

 

 

III. concatMap

concatMap은 각각의 Sequence가 다음 Sequence가 구독되기 전에 합쳐지는 것을 보장합니다. 이게 무슨 말이냐! 여러 Observable Sequence들을 모두 합쳐서 방출한다고 생각하면 됩니다.

let sequences = ["Germany": Observable.of("Berlin", "Münich", "Frankfurt"),
                 "Spain": Observable.of("Madrid", "Barcelona", "Valencia"), 
                 "Korea": Observable.of("Seoul", "Busan", "Incheon", "Jeju")]

Observable.of("Germany", "Spain", "Korea")
    .concatMap({ country in
        sequences[country] ?? .empty()
    })
    .subscribe(
        onNext: {
            print($0)
        }
    )

 

[ 출력 ]

Berlin

Münich

Frankfurt

Madrid

Barcelona

Valencia

Seoul

Busan

Incheon

Jeju

 

 

B. Merging

I. merge

RxSwift에는 여러 Sequence들을 합치는 다양한 방법들이 있습니다. 그 중에서도 가장 쉬운 방식은 merge를 사용하는 것입니다. merge는 내부 Sequence들이 모두 종료되었을 때 종료되기 때문에 메모리 누수를 방지하기 위해서는 적절한 타이밍에 dispose해주어야 합니다. 만약 merge된 여러 Sequence들 중에서 어떤 하나가 error를 방출하면 merge( )는 즉시 error를 방출하고 종료됩니다. 

let left = PublishSubject<String>()
let right = PublishSubject<String>()

let source = Observable.of(left.asObservable(), right.asObservable())

let observable = source.merge()
let disposable = 
    observable.subscribe(
        onNext: {
            print($0)
        }
    )
    
left.onNext("Left: 1")
right.onNext("Right: 4")
right.onNext("Right: 5")
left.onNext("Left: 2")
right.onNext("Right: 6")
left.onNext("Left: 3")

// disposable.dispose()
left.onError(MyError.anError)
right.onNext("Right: 7")

 

[ 출력 ]

Left: 1

Right: 4

Right: 5

Left: 2

Right: 6

Left: 3

Unhandled error happened: anError

 

 

II. merge(maxConcurrent: )

merge할 수 있는 Sequence 수를 제한하기 위해 maxConcurrent 프로퍼티를 사용할 수 있습니다. 설정해준 maxConcurrent 수에 도달할 때까지 계속해서 merge를 하다가 제한된 수만큼 도달했을 경우, 이후에 들어오는 Observable들은 대기열에 넣게 됩니다. 그리고 현재 Sequence 중 하나가 종료되자마자 대기열에 있는 Observable Sequence를 subscribe하기 시작합니다. 예를 들어, 네트워크 요청이 많아질 때 리소스를 제한하거나 네트워크 연결 수를 제한하기 위해서 이 메서드를 사용할 수 있습니다.

 

 

C. Combining elements

I. combineLatest( _: _: resultSelector : )

결합된 Sequence들은 값을 방출할 때마다, 그 Sequence들은 우리가 제공하는 클로저들을 호출합니다. 그리고 각각의 결합된 Sequence들이 방출한 최종 value를 받게 됩니다.

예를 들어, 여러 TextField를 한 번에 관찰하고 값을 결합하거나 여러 소스들의 상태를 보는 것과 같은 상황에서 사용할 수 있습니다. 회원가입할 때 각 TextField를 검증하는 데에도 사용될 수도 있겠어요!

let left = PublishSubject<String>()
let right = PublishSubject<String>()

let observable = Observable.combineLatest(
    left, 
    right, 
    resultSelector: { lastLeft, lastRight in
        "\(lastLeft) | \(lastRight)"
    }
)

let disposable = observable.subscribe(
    onNext: { print($0) }
)

left.onNext("Left: 1")
right.onNext("Right: 4")
right.onNext("Right: 5")
left.onNext("Left: 2")
right.onNext("Right: 6")
left.onNext("Left: 3")

disposable.dispose()

 

[ 출력 ]

Left: 1 | Right: 4

Left: 1 | Right: 5

Left: 2 | Right: 5

Left: 2 | Right: 6

Left: 3 | Right: 6

 

 

II. combineLatest( _, _, resultSelector : )

combineLastest 계열에는 다양한 연산자들이 있습니다. 이것들은 대략 2~8개의 Observable Sequence를 파라미터로 가집니다. 그리고 Sequence elemenet의 타입이 같을 필요는 없습니다. 아래 코드를 보면 유저가 세팅을 바꿀 때마다 자동으로 화면에 업데이트를 해주는 예제를 볼 수 있습니다.

let choice = PublishSubject<DateFormatter.Style>()
let dates = Observable.of(Date())

let observable = Observable.combineLatest(
    choice,
    dates,
    resultSelector: { (format, when) -> String in
        let formatter = DateFormatter()
        formatter.dateStyle = format
            
        return formatter.string(from: when)
    }
)

observable.subscribe(onNext: { print($0) })
    
choice.onNext(DateFormatter.Style.short)
choice.onNext(DateFormatter.Style.long)
choice.onNext(DateFormatter.Style.short)

 

[ 출력 ]

1/13/21

January 13, 2021

1/13/21

 

 

III. combineLatest( [ ], resultSelector : )

배열 안의 최종 값들을 결합하는 형태도 있습니다. 위에 있는 I번 combineLatest에서 다뤘던 예제를 아래와 같이 배열로 넣는 방식으로 변형할 수 있습니다.

let observable = Observable.combineLatest([left, right]) { strings in
    strings.joined(separator: " | ")
}

 

 

IV. zip

또 다른 combination operator로 zip이 있습니다. combineLatest 계열과 비슷하지만, zip은 일련의 Observable이 새로운 값을 각자 방출할 때까지 기다리다가 둘 중 하나의 observable이라도 완료되면 zip 역시 완료됩니다. 즉, element가 더 많은 Observable이 남아있더라도 더 적은 Observable이 완료된다면 zip도 완료됩니다. 이렇게 Sequence에 따라 단계별로 작동하는 방법을 Indexed Sequencing 이라고 합니다.

enum Weather {
    case cloudy
    case sunny
}

let left = PublishSubject<Weather>()
let right = Observable.of("Lisbon", "London", "Vienna")

let observable = Observable.zip(
    left,
    right,
    resultSelector: { (weather, city) in
        return "It's \(weather) in \(city)"
    }
)

observable.subscribe(onNext: { print($0) })

left.onNext(.sunny)
left.onNext(.cloudy)
left.onNext(.cloudy)
left.onNext(.sunny)

 

[ 출력 ]

It's sunny in Lisbon

It's cloudy in London

It's cloudy in Vienna

 

 

D. Triggers

I. withLatestFrom(_:)

여러 개의 Observable들을 한 번에 받는 경우가 있습니다. 이 때 다른 Observable들로부터 데이터를 받는 동안 Observable은 단순히 Trigger 역할을 할 수 있습니다. 예를 들어, 마블 다이어그램의 경우 button에서 발생하는 tap 이벤트가 trigger가 되어 textfield의 값을 방출하도록 하고 있습니다.

let button = PublishSubject<Void>()
let textField = PublishSubject<String>()

let observable = button.withLatestFrom(textField)
_ = observable.subscribe(onNext: { print($0) })

textField.onNext("Par")
textField.onNext("Pari")
textField.onNext("Paris")
button.onNext(())
button.onNext(())
textField.onNext("Pari")
button.onNext(())

 

[ 출력 ]

Paris

Paris

Pari

 

 

II. sample(_:)

withLatestFrom(_:)과 거의 비슷하지만, sample은 연속되는 같은 값이 여러번 발생해도 한 번만 방출합니다. 즉, 여러 번 trigger를 작동시키더라도 한 번만 출력됩니다. 그리고 withLatestFrom은 Observable을 파라미터로 받지만, sample은 Trigger를 파라미터로 받습니다.

withLatestFrom 예제 코드에서 observable 부분을 아래와 같이 바꿔줍니다.

let observable = textField.sample(button)

 

[ 출력 ]

Paris

Pari

 

만약 withLatestFrom을 sample처럼 작동하게 하려면 distinctUntilChanged( )와 함께 사용하면 됩니다.

let observable = button.withLatestFrom(textField)
_ = observable
    .distinctUntilChanged()
    .subscribe(onNext: { print($0) })

 

[ 출력 ]

Paris

Pari

 

 

E. Switches

I. amb(_:)

amb는 ambiguous(모호한) 이라고 생각하면 됩니다. 두 가지 Sequence의 이벤트 중에서 어떤 것을 구독할 지 선택할 수 있도록 합니다. 아래 코드를 보면 amb(_:) 연산자는 left, right 두 개의 Observable을 모두 subscribe합니다. 그리고 두 Sequence 중에서 어떤 것이든 element를 방출하는 것을 기다리다가 둘 중 하나가 방출을 시작하면 나머지에 대해서는 subscribe를 중단합니다. 그리고 방출을 처음 시작한 Observable에 대해서만 element를 방출합니다.

let left = PublishSubject<String>()
let right = PublishSubject<String>()

let observable = left.amb(right)
let disposable = observable.subscribe(onNext: { print($0) })

right.onNext("Right: 4")
left.onNext("Left: 1")
right.onNext("Right: 5")
left.onNext("Left: 2")
right.onNext("Right: 6")
left.onNext("Left: 3")

disposable.dispose()

 

[ 출력 ]

Right: 4

Right: 5

Right: 6

 

 

II. switchLatest( )

switchLatest는 source 역할을 하는 Observable에 마지막으로 들어온 Sequence로부터 아이템을 방출받도록 합니다.

말만 보면 조금 어려울 수도 있는데, 예시를 보면 source에서 one이 들어왔을 때, source의 가장 최근의 값은 one이니까 one에서 방출되는 1, 2만 subscriber가 볼 수 있고, source에 two가 들어왔을 때는 source의 가장 최근의 값이 two가 되니까 two에서 방출되는 6만 subscriber에게 방출되게 됩니다.

let one = PublishSubject<String>()
let two = PublishSubject<String>()

let source = PublishSubject<Observable<String>>()

let observable = source.switchLatest()
let disposable = observable.subscribe(onNext: { print($0) })

one.onNext("One: 0")

source.onNext(one)
one.onNext("One: 1")
two.onNext("Two: 4")
one.onNext("One: 2")
two.onNext("Two: 5")

source.onNext(two)
two.onNext("Two: 6")
one.onNext("One: 3")

disposable.dispose()

 

[ 출력 ]

One: 1

One: 2

Two: 6

 

 

F. Combining elements within a sequence

I. reduce( _: accumulator: )

reduce는 source observable이 완료될 때만 계산 값을 생성합니다. 따라서 완료되지 않은 Sequence에 이 연산자를 적용하면 아무것도 출력되지 않습니다.

let seq = PublishSubject<Int>() // source

let obervable = seq.reduce(0, accumulator: +)
observable.subscribe(onNext: { print($0) })

seq.onNext(1)
seq.onNext(2)
seq.onNext(3)
seq.onCompleted()

 

[ 출력 ]

6

 

또는, 아래와 같이 accumulator를 사용자 정의대로 연산하도록 할 수 있습니다.

let observable = seq.reduce(10, accumulator: { old, new in
    return old - new
})

 

 

II. scan( _: accumulator: )

앞서 살펴보았던 reduce는 observable이 완료되었을 때, 모든 연산이 끝난 결과값 하나만을 출력합니다. 이와 다르게 scanobservable에 들어오는 이벤트들을 연산할 때마다 그 결과를 방출해줍니다. 총합, 통계, 상태를 계산할 때 등 scan의 쓰임새는 광범위합니다. 그 자세한 예제는 Ch20. RxGesture에서 확인할 수 있습니다.

let seq = Observable.of(1, 2, 3)

let observable = source.scan(0, accumulator: +)
observable.subscribe(onNext: { print($0) })

 

[ 출력 ]

1

3

6

 

 

 

 

 

 

반응형

'iOS > RxSwift' 카테고리의 다른 글

Ch7. Transforming Operators  (0) 2023.02.24
Ch5. Filtering Operators  (0) 2023.02.24
RxSwift. Traits가 뭘까? (Single, Maybe, Completable)  (0) 2023.02.24
Ch3. Subjects  (0) 2023.02.24
Ch2. Observable  (0) 2023.02.24

댓글