code

ObserveOn 및 SubscribeOn-작업이 수행되는 위치

codestyles 2021. 1. 9. 09:56
반응형

ObserveOn 및 SubscribeOn-작업이 수행되는 위치


이 질문을 읽음 에 따라 SubscribeOn과 ObserveOn의 차이점은 무엇입니까?

ObserveOnSubscribe핸들러 의 코드 가 실행되는 위치를 설정합니다 .

stream.Subscribe(_ => { // this code here });

SubscribeOn메서드는 스트림 설정이 수행되는 스레드를 설정합니다.

이것이 명시 적으로 설정되지 않으면 TaskPool이 사용된다는 것을 이해하게됩니다.

이제 내 질문은 다음과 같이 해보자.

Observable.Interval(new Timespan(0, 0, 1)).Where(t => predicate(t)).SelectMany(t => lots_of(t)).ObserveOnDispatcher().Subscribe(t => some_action(t));

디스패처에서 실행 되는 경우 Where predicateSelectMany lots_of실행중인 위치 는 어디 some_action입니까?


SubscribeOn에 대한 잘못된 정보가 많이 있습니다 ObserveOn.

요약

  • SubscribeOn가로 채기의 하나의 메소드 호출 IObservable<T>이다, Subscribe및 호출 DisposeIDisposable핸들에 의해 반환 Subscribe.
  • ObserveOn도청의 방법을 호출 IObserver<T>있습니다 OnNext, OnCompleted& OnError.
  • 두 방법 모두 지정된 스케줄러에서 각각의 호출이 이루어 지도록합니다.

분석 및 시연

진술

ObserveOn은 Subscribe 핸들러의 코드가 실행되는 위치를 설정합니다.

도움이되는 것보다 더 혼란 스럽습니다. "구독 처리기"라고하는 것은 실제로 OnNext처리기입니다. 의 기억 Subscribe방법 IObservable받아 IObserver가지고 그 OnNext, OnCompleted그리고 OnError방법을하지만 람다를 수락하고 구축 편의 과부하 제공하는 확장 메서드입니다 IObserver당신을 위해 구현.

그래도 용어를 적절하게하겠습니다. "Subscribe handler" 가 호출 될 때 호출 되는 Observable 의 코드 Subscribe라고 생각합니다. 이런 식으로 위의 설명은의 목적과 더 유사합니다 SubscribeOn.

구독하기

SubscribeOnSubscribeObservable 메서드가 지정된 스케줄러 또는 컨텍스트에서 비동기 적으로 실행되도록합니다. Subscribe일반적으로 오래 실행될 수 있고 호출 스레드를 차단하고 싶지 않기 때문에 실행중인 스레드의 관찰 가능 항목에서 메서드 를 호출하고 싶지 않을 때 사용합니다 .

를 호출 Subscribe하면 긴 관측 가능 체인의 일부일 수있는 관측 가능 항목을 호출하는 것입니다. SubscribeOn그것이 영향을 미치는 것은 관찰 가능한 것 입니다. 이제 체인의 모든 옵저버 블이 동일한 스레드에서 즉시 구독되는 경우가 될 수 있지만 반드시 그럴 필요는 없습니다. 생각해 Concat- 예를 들어, 위의 스트림이 완료, 일반적으로이 어떤 스레드에서 호출 이전 스트림 일어날 것이다 번만 각 연속 스트림에 가입 한 것을 OnCompleted에서합니다.

따라서 SubscribeOn호출 Subscribe과 구독중인 Observable 사이에 위치하여 호출을 가로 채서 비동기로 만듭니다.

구독 폐기에도 영향을 미칩니다. 구독 취소에 사용되는 핸들을 Subscribe반환합니다 IDisposable. 제공된 스케줄러에서에 대한 SubscribeOn호출 Dispose이 예약 되도록합니다 .

이해하려고 혼란의 공통점 SubscribeOn않습니다는 점이다 Subscribe관찰의 핸들러가 잘 호출 할 수 있습니다 OnNext, OnCompleted또는 OnError이 동일한 스레드에서. 그러나 그 목적은 이러한 호출에 영향을주는 것이 아닙니다. Subscribe메서드가 반환 되기 전에 스트림이 완료되는 것은 드문 일이 아닙니다 . Observable.Return예를 들어 이렇게합니다. 한 번 보자.

내가 작성한 Spy 메서드 를 사용 하고 다음 코드를 실행하는 경우 :

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");

다음 출력을 얻습니다 (스레드 ID는 물론 다를 수 있음).

Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned

전체 구독 처리기가 동일한 스레드에서 실행되고 반환되기 전에 완료되었음을 알 수 있습니다.

SubscribeOn이것을 비동기 적으로 실행 하기 위해 사용합시다 . Return관찰 대상과 관찰 대상 모두를 SubscribeOn감시합니다.

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");

다음과 같이 출력됩니다 (내가 추가 한 줄 번호).

01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 SubscribeOn: Observable obtained on Thread: 1
04 SubscribeOn: Subscribed to on Thread: 1
05 SubscribeOn: Subscription completed.
06 Subscribe returned
07 Return: Subscribed to on Thread: 2
08 Return: OnNext(1) on Thread: 2
09 SubscribeOn: OnNext(1) on Thread: 2
10 Return: OnCompleted() on Thread: 2
11 SubscribeOn: OnCompleted() on Thread: 2
12 Return: Subscription completed.

01-기본 메소드가 스레드 1에서 실행 중입니다.

02- ReturnObservable이 호출 스레드에서 평가됩니다. 우리는 IObservable여기에 도착하고 있으며 아직 구독하는 것은 없습니다.

03- SubscribeOnObservable이 호출 스레드에서 평가됩니다.

04-이제 마지막 Subscribe으로 SubscribeOn.

05- Subscribe메서드가 비동기 적으로 완료됩니다 ...

06-... 그리고 쓰레드 1은 메인 메소드로 돌아갑니다. 이것이 바로 SubscribeOn의 효과입니다!

07-한편, SubscribeOn기본 스케줄러에서 Return. 여기 스레드 2에서 수신됩니다.

08 -로 그리고 Return않습니다, 그것은 호출 OnNextSubscribe실 ...

09- SubscribeOn이제 통과 일뿐입니다.

10,11-동일 OnCompleted

12-마지막으로 모든 Return구독 처리기가 완료되었습니다.

의 목적과 효과가 명확 해지기를 바랍니다 SubscribeOn.

관찰

호출을 다른 스레드로 전달 SubscribeOn하는 Subscribe메서드에 대한 인터셉터로 생각 ObserveOn하면 동일한 작업을 수행하지만 OnNext, OnCompletedOnError호출에 대해 수행 합니다.

원래 예를 떠올려보십시오.

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");

이 출력은 다음과 같습니다.

Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned

이제 다음을 사용하도록 변경하십시오 ObserveOn.

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");

다음과 같은 출력을 얻습니다.

01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 ObserveOn: Observable obtained on Thread: 1
04 ObserveOn: Subscribed to on Thread: 1
05 Return: Subscribed to on Thread: 1
06 Return: OnNext(1) on Thread: 1
07 ObserveOn: OnNext(1) on Thread: 2
08 Return: OnCompleted() on Thread: 1
09 Return: Subscription completed.
10 ObserveOn: Subscription completed.
11 Subscribe returned
12 ObserveOn: OnCompleted() on Thread: 2

01-주요 방법은 스레드 1에서 실행됩니다.

02-이전과 마찬가지로 ReturnObservable이 호출 스레드에서 평가됩니다. 우리는 IObservable여기에 도착하고 있으며 아직 구독하는 것은 없습니다.

03- ObserveOnObservable은 호출 스레드에서도 평가됩니다.

04-이제 우리는 다시 호출 스레드에서 ObserveOnobservable에 먼저 가입합니다 .

05-... 호출을 ReturnObservable 로 전달합니다 .

06 - 이제 Return호출 OnNext의에서 Subscribe핸들러입니다.

07- 다음은 ObserveOn. OnNext스레드 2에서 비동기 적으로 예약 된 것을 볼 수 있습니다 .

08-한편 스레드 1을 Return호출 OnCompleted합니다 ...

09-그리고 Return의 구독 처리기가 완료됩니다 ...

10- ObserveOn의 구독 핸들러도 마찬가지입니다 ...

11-컨트롤이 메인 메서드로 반환됩니다.

12 - 한편, ObserveOn좌우로 움직이게했다 Return'의 OnCompleted이 비동기 적으로 실행되고 있기 때문에 09-11 동안이 언제든지 일어날 수 2 댓글이 넘는 전화를. 드디어 지금 호출되었습니다.

일반적인 사용 사례는 무엇입니까?

오래 실행되는 Observable SubscribeOn을 필요로 Subscribe하고 가능한 한 빨리 디스패처 스레드에서 벗어나고 싶을 때 GUI에서 사용되는 경우가 가장 많습니다 . 구독 할 때 호출되는 첫 번째 Observable이므로 Observable 체인의 끝에 적용하십시오.

당신은 가장 자주 볼 수 있습니다 ObserveOn당신은 보장 할 때 GUI에서 사용 OnNext, OnCompletedOnError호출은 디스패처 스레드 다시 정렬 화하고 있습니다. 가능한 한 늦게 다시 전환하려면 관찰 가능한 체인의 끝에 적용하십시오.

희망 당신은 당신의 질문에 대한 대답은 그 것을 볼 수 있습니다 ObserveOnDispatcher그 스레드에 차이를하지 않습니다 WhereSelectMany모든 스레드의 무엇을 따라 -에서 실행됩니다 스트림 에서 그들을 호출! 스트림의 가입 핸들러는 호출 스레드에서 호출됩니다, 그러나 그것은 어디 말을하는 것은 불가능 Where하고 SelectMany방법을 아는없이 실행됩니다 stream구현됩니다.

Subscribe 호출보다 수명이 긴 Observable

지금까지 우리는 Observable.Return. 핸들러 Return내에서 스트림을 완료 Subscribe합니다. 이것은 비정형적인 것은 아니지만 스트림이 Subscribe핸들러 보다 오래 사는 것도 똑같이 일반적입니다 . Observable.Timer, 예를 들어 :

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.Subscribe();
Console.WriteLine("Subscribe returned");

다음을 반환합니다.

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2

당신은 명확하게 가입 완료 후 볼 수 있습니다 OnNextOnCompleted다른 스레드에서 나중에 호출되는.

참고의 어떤 조합하는 것이 SubscribeOn또는 ObserveOn지지 않습니다 어떠한 효과가 있는 스레드 또는 스케줄러 Timerchoses가 호출 OnNextOnCompleted합니다.

물론 스레드 SubscribeOn를 결정하는 데 사용할 수 있습니다 Subscribe.

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");

( 같은 스레드 풀 스레드를 얻는 NewThreadScheduler경우 혼란을 방지하기 위해 의도적으로 여기로 변경하고 있습니다. )TimerSubscribeOn

기부:

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
SubscribeOn: Observable obtained on Thread: 1
SubscribeOn: Subscribed to on Thread: 1
SubscribeOn: Subscription completed.
Subscribe returned
Timer: Subscribed to on Thread: 2
Timer: Subscription completed.
Timer: OnNext(0) on Thread: 3
SubscribeOn: OnNext(0) on Thread: 3
Timer: OnCompleted() on Thread: 3
SubscribeOn: OnCompleted() on Thread: 3

여기에서 스레드 (1)의 메인 스레드가 Subscribe호출 후 반환 되지만 Timer구독은 자체 스레드 (2)를 얻지 만 OnNextOnCompleted호출은 스레드 (3)에서 실행되는 것을 명확하게 볼 수 있습니다 .

이제 ObserveOn에서는 코드를 다음과 같이 변경해 보겠습니다 (코드를 따르는 경우에는 nuget 패키지 rx-wpf 사용).

var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");

이 코드는 약간 다릅니다. 첫 번째 줄은 디스패처가 있는지 확인하고, 또한 가져옵니다. ObserveOnDispatcher이는 평가되는 모든 스레드의을ObserveOn 사용해야한다는 점을 제외하면 .DispatcherScheduler ObserveOnDispatcher

이 코드는 다음 출력을 제공합니다.

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
ObserveOn: OnNext(0) on Thread: 1
Timer: OnCompleted() on Thread: 2
ObserveOn: OnCompleted() on Thread: 1

디스패처 (및 메인 스레드)는 스레드 1 Timer이며 여전히 선택한 스레드 (2)에서 호출 OnNext하고 OnCompleted있지만 ObserveOnDispatcheris는 디스패처 스레드 인 스레드 (1)로 다시 호출합니다.

또한 디스패처 스레드 (예 : a Thread.Sleep)를 ObserveOnDispatcher차단 하면 would 차단 (이 코드는 LINQPad 기본 메서드 내에서 가장 잘 작동 함)을 볼 수 있습니다.

var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
Console.WriteLine("Blocking the dispatcher");
Thread.Sleep(2000);
Console.WriteLine("Unblocked");

다음과 같은 출력이 표시됩니다.

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Blocking the dispatcher
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2
Unblocked
ObserveOn: OnNext(0) on Thread: 1
ObserveOn: OnCompleted() on Thread: 1

호출을 통해 ObserveOnDispatcher한 번만 나갈 수있는 Sleep것은 실행되었습니다.

키 포인트

It's useful to keep in mind that Reactive Extensions is essentially a free-threaded library, and tries to be as lazy as possible about what thread it runs on - you have to deliberately interfere with ObserveOn, SubscribeOn and passing specific schedulers to operators that accept them to change this.

There's nothing a consumer of an observable can do to control what it's doing internally - ObserveOn and SubscribeOn are decorators that wrap the surface area of observers and observables to marshal calls across threads. Hopefully these examples have made that clear.


I found James's answer very clear and comprehensive. However, I despite this I still find myself having to explain the differences.

Therefore I created a very simple/stupid example that allows me to graphically demonstrate what schedulers things are being called on. I've created a class MyScheduler that executes actions immediately, but will change the console colour.

The text output from the SubscribeOn scheduler is output in red and that from ObserveOn scheduler is output in blue.

using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;

namespace SchedulerExample
{

    class Program
    {
        static void Main(string[] args)
        {
            var mydata = new[] {"A", "B", "C", "D", "E"};
            var observable = Observable.Create<string>(observer =>
                                            {
                                                Console.WriteLine("Observable.Create");
                                                return mydata.ToObservable().
                                                    Subscribe(observer);
                                            });

            observable.
                SubscribeOn(new MyScheduler(ConsoleColor.Red)).
                ObserveOn(new MyScheduler(ConsoleColor.Blue)).
                Subscribe(s => Console.WriteLine("OnNext {0}", s));

            Console.ReadKey();
        }
    }
}

This outputs:

scheduler

And for reference MyScheduler (not suitable for real usage):

using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;

namespace SchedulerExample
{
    class MyScheduler : IScheduler
    {
        private readonly ConsoleColor _colour;

        public MyScheduler(ConsoleColor colour)
        {
            _colour = colour;
        }

        public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
        {
            return Execute(state, action);
        }

        private IDisposable Execute<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
        {
            var tmp = Console.ForegroundColor;
            Console.ForegroundColor = _colour;
            action(this, state);
            Console.ForegroundColor = tmp;
            return Disposable.Empty;
        }

        public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
        {
            throw new NotImplementedException();
        }

        public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
        {
            throw new NotImplementedException();
        }

        public DateTimeOffset Now
        {
            get { return DateTime.UtcNow; }
        }
    }
}

I often mistake that .SubcribeOn is used to set thread where code inside .Subscribe is being executed. But to remember, just think that publish and subscribe must be pair like yin-yang. To set where Subscribe's code being executed use ObserveOn. To set where Observable's code executed used SubscribeOn. Or in summary mantra: where-what,Subscribe-Observe,Observe-Subscribe.

ReferenceURL : https://stackoverflow.com/questions/20451939/observeon-and-subscribeon-where-the-work-is-being-done

반응형