ObserveOn 및 SubscribeOn-작업이 수행되는 위치
이 질문을 읽음 에 따라 SubscribeOn과 ObserveOn의 차이점은 무엇입니까?
ObserveOn
Subscribe
핸들러 의 코드 가 실행되는 위치를 설정합니다 .
stream.Subscribe(_ => { // this code here });
이 SubscribeOn
메서드는 스트림 설정이 수행되는 스레드를 설정합니다.
이것이 명시 적으로 설정되지 않으면 TaskPool이 사용된다는 것을 이해하게됩니다.
이제 내 질문은 다음과 같이 해보자.
Observable.Interval(new Timespan(0, 0, 1)).Where(t => predicate(t)).SelectMany(t => lots_of(t)).ObserveOnDispatcher().Subscribe(t => some_action(t));
디스패처에서 실행 되는 경우 Where
predicate
및 SelectMany
lots_of
실행중인 위치 는 어디 some_action
입니까?
SubscribeOn
및 에 대한 잘못된 정보가 많이 있습니다 ObserveOn
.
요약
SubscribeOn
가로 채기의 하나의 메소드 호출IObservable<T>
이다,Subscribe
및 호출Dispose
온IDisposable
핸들에 의해 반환Subscribe
.ObserveOn
도청의 방법을 호출IObserver<T>
있습니다OnNext
,OnCompleted
&OnError
.- 두 방법 모두 지정된 스케줄러에서 각각의 호출이 이루어 지도록합니다.
분석 및 시연
진술
ObserveOn은 Subscribe 핸들러의 코드가 실행되는 위치를 설정합니다.
도움이되는 것보다 더 혼란 스럽습니다. "구독 처리기"라고하는 것은 실제로 OnNext
처리기입니다. 의 기억 Subscribe
방법 IObservable
받아 IObserver
가지고 그 OnNext
, OnCompleted
그리고 OnError
방법을하지만 람다를 수락하고 구축 편의 과부하 제공하는 확장 메서드입니다 IObserver
당신을 위해 구현.
그래도 용어를 적절하게하겠습니다. "Subscribe handler" 가 호출 될 때 호출 되는 Observable 의 코드 Subscribe
라고 생각합니다. 이런 식으로 위의 설명은의 목적과 더 유사합니다 SubscribeOn
.
구독하기
SubscribeOn
Subscribe
Observable 의 메서드가 지정된 스케줄러 또는 컨텍스트에서 비동기 적으로 실행되도록합니다. Subscribe
일반적으로 오래 실행될 수 있고 호출 스레드를 차단하고 싶지 않기 때문에 실행중인 스레드의 관찰 가능 항목에서 메서드 를 호출하고 싶지 않을 때 사용합니다 .
를 호출 Subscribe
하면 긴 관측 가능 체인의 일부일 수있는 관측 가능 항목을 호출하는 것입니다. SubscribeOn
그것이 영향을 미치는 것은 관찰 가능한 것 입니다. 이제 체인의 모든 옵저버 블이 동일한 스레드에서 즉시 구독되는 경우가 될 수 있지만 반드시 그럴 필요는 없습니다. 생각해 Concat
- 예를 들어, 위의 스트림이 완료, 일반적으로이 어떤 스레드에서 호출 이전 스트림 일어날 것이다 번만 각 연속 스트림에 가입 한 것을 OnCompleted
에서합니다.
따라서 SubscribeOn
호출 Subscribe
과 구독중인 Observable 사이에 위치하여 호출을 가로 채서 비동기로 만듭니다.
구독 폐기에도 영향을 미칩니다. 구독 취소에 사용되는 핸들을 Subscribe
반환합니다 IDisposable
. 제공된 스케줄러에서에 대한 SubscribeOn
호출 Dispose
이 예약 되도록합니다 .
이해하려고 혼란의 공통점 SubscribeOn
않습니다는 점이다 Subscribe
관찰의 핸들러가 잘 호출 할 수 있습니다 OnNext
, OnCompleted
또는 OnError
이 동일한 스레드에서. 그러나 그 목적은 이러한 호출에 영향을주는 것이 아닙니다. Subscribe
메서드가 반환 되기 전에 스트림이 완료되는 것은 드문 일이 아닙니다 . Observable.Return
예를 들어 이렇게합니다. 한 번 보자.
내가 작성한 Spy 메서드 를 사용 하고 다음 코드를 실행하는 경우 :
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");
다음 출력을 얻습니다 (스레드 ID는 물론 다를 수 있음).
Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned
전체 구독 처리기가 동일한 스레드에서 실행되고 반환되기 전에 완료되었음을 알 수 있습니다.
SubscribeOn
이것을 비동기 적으로 실행 하기 위해 사용합시다 . Return
관찰 대상과 관찰 대상 모두를 SubscribeOn
감시합니다.
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");
다음과 같이 출력됩니다 (내가 추가 한 줄 번호).
01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 SubscribeOn: Observable obtained on Thread: 1
04 SubscribeOn: Subscribed to on Thread: 1
05 SubscribeOn: Subscription completed.
06 Subscribe returned
07 Return: Subscribed to on Thread: 2
08 Return: OnNext(1) on Thread: 2
09 SubscribeOn: OnNext(1) on Thread: 2
10 Return: OnCompleted() on Thread: 2
11 SubscribeOn: OnCompleted() on Thread: 2
12 Return: Subscription completed.
01-기본 메소드가 스레드 1에서 실행 중입니다.
02- Return
Observable이 호출 스레드에서 평가됩니다. 우리는 IObservable
여기에 도착하고 있으며 아직 구독하는 것은 없습니다.
03- SubscribeOn
Observable이 호출 스레드에서 평가됩니다.
04-이제 마지막 Subscribe
으로 SubscribeOn
.
05- Subscribe
메서드가 비동기 적으로 완료됩니다 ...
06-... 그리고 쓰레드 1은 메인 메소드로 돌아갑니다. 이것이 바로 SubscribeOn의 효과입니다!
07-한편, SubscribeOn
기본 스케줄러에서 Return
. 여기 스레드 2에서 수신됩니다.
08 -로 그리고 Return
않습니다, 그것은 호출 OnNext
온 Subscribe
실 ...
09- SubscribeOn
이제 통과 일뿐입니다.
10,11-동일 OnCompleted
12-마지막으로 모든 Return
구독 처리기가 완료되었습니다.
의 목적과 효과가 명확 해지기를 바랍니다 SubscribeOn
.
관찰
호출을 다른 스레드로 전달 SubscribeOn
하는 Subscribe
메서드에 대한 인터셉터로 생각 ObserveOn
하면 동일한 작업을 수행하지만 OnNext
, OnCompleted
및 OnError
호출에 대해 수행 합니다.
원래 예를 떠올려보십시오.
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");
이 출력은 다음과 같습니다.
Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned
이제 다음을 사용하도록 변경하십시오 ObserveOn
.
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
다음과 같은 출력을 얻습니다.
01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 ObserveOn: Observable obtained on Thread: 1
04 ObserveOn: Subscribed to on Thread: 1
05 Return: Subscribed to on Thread: 1
06 Return: OnNext(1) on Thread: 1
07 ObserveOn: OnNext(1) on Thread: 2
08 Return: OnCompleted() on Thread: 1
09 Return: Subscription completed.
10 ObserveOn: Subscription completed.
11 Subscribe returned
12 ObserveOn: OnCompleted() on Thread: 2
01-주요 방법은 스레드 1에서 실행됩니다.
02-이전과 마찬가지로 Return
Observable이 호출 스레드에서 평가됩니다. 우리는 IObservable
여기에 도착하고 있으며 아직 구독하는 것은 없습니다.
03- ObserveOn
Observable은 호출 스레드에서도 평가됩니다.
04-이제 우리는 다시 호출 스레드에서 ObserveOn
observable에 먼저 가입합니다 .
05-... 호출을 Return
Observable 로 전달합니다 .
06 - 이제 Return
호출 OnNext
의에서 Subscribe
핸들러입니다.
07- 다음은 ObserveOn
. OnNext
스레드 2에서 비동기 적으로 예약 된 것을 볼 수 있습니다 .
08-한편 스레드 1을 Return
호출 OnCompleted
합니다 ...
09-그리고 Return
의 구독 처리기가 완료됩니다 ...
10- ObserveOn
의 구독 핸들러도 마찬가지입니다 ...
11-컨트롤이 메인 메서드로 반환됩니다.
12 - 한편, ObserveOn
좌우로 움직이게했다 Return
'의 OnCompleted
이 비동기 적으로 실행되고 있기 때문에 09-11 동안이 언제든지 일어날 수 2 댓글이 넘는 전화를. 드디어 지금 호출되었습니다.
일반적인 사용 사례는 무엇입니까?
오래 실행되는 Observable SubscribeOn
을 필요로 Subscribe
하고 가능한 한 빨리 디스패처 스레드에서 벗어나고 싶을 때 GUI에서 사용되는 경우가 가장 많습니다 . 구독 할 때 호출되는 첫 번째 Observable이므로 Observable 체인의 끝에 적용하십시오.
당신은 가장 자주 볼 수 있습니다 ObserveOn
당신은 보장 할 때 GUI에서 사용 OnNext
, OnCompleted
및 OnError
호출은 디스패처 스레드 다시 정렬 화하고 있습니다. 가능한 한 늦게 다시 전환하려면 관찰 가능한 체인의 끝에 적용하십시오.
희망 당신은 당신의 질문에 대한 대답은 그 것을 볼 수 있습니다 ObserveOnDispatcher
그 스레드에 차이를하지 않습니다 Where
및 SelectMany
모든 스레드의 무엇을 따라 -에서 실행됩니다 스트림 에서 그들을 호출! 스트림의 가입 핸들러는 호출 스레드에서 호출됩니다, 그러나 그것은 어디 말을하는 것은 불가능 Where
하고 SelectMany
방법을 아는없이 실행됩니다 stream
구현됩니다.
Subscribe 호출보다 수명이 긴 Observable
지금까지 우리는 Observable.Return
. 핸들러 Return
내에서 스트림을 완료 Subscribe
합니다. 이것은 비정형적인 것은 아니지만 스트림이 Subscribe
핸들러 보다 오래 사는 것도 똑같이 일반적입니다 . 봐 Observable.Timer
, 예를 들어 :
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.Subscribe();
Console.WriteLine("Subscribe returned");
다음을 반환합니다.
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2
당신은 명확하게 가입 완료 후 볼 수 있습니다 OnNext
및 OnCompleted
다른 스레드에서 나중에 호출되는.
참고의 어떤 조합하는 것이 SubscribeOn
또는 ObserveOn
지지 않습니다 어떠한 효과가 있는 스레드 또는 스케줄러 Timer
choses가 호출 OnNext
및 OnCompleted
합니다.
물론 스레드 SubscribeOn
를 결정하는 데 사용할 수 있습니다 Subscribe
.
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");
( 같은 스레드 풀 스레드를 얻는 NewThreadScheduler
경우 혼란을 방지하기 위해 의도적으로 여기로 변경하고 있습니다. )Timer
SubscribeOn
기부:
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
SubscribeOn: Observable obtained on Thread: 1
SubscribeOn: Subscribed to on Thread: 1
SubscribeOn: Subscription completed.
Subscribe returned
Timer: Subscribed to on Thread: 2
Timer: Subscription completed.
Timer: OnNext(0) on Thread: 3
SubscribeOn: OnNext(0) on Thread: 3
Timer: OnCompleted() on Thread: 3
SubscribeOn: OnCompleted() on Thread: 3
여기에서 스레드 (1)의 메인 스레드가 Subscribe
호출 후 반환 되지만 Timer
구독은 자체 스레드 (2)를 얻지 만 OnNext
및 OnCompleted
호출은 스레드 (3)에서 실행되는 것을 명확하게 볼 수 있습니다 .
이제 ObserveOn
에서는 코드를 다음과 같이 변경해 보겠습니다 (코드를 따르는 경우에는 nuget 패키지 rx-wpf 사용).
var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
이 코드는 약간 다릅니다. 첫 번째 줄은 디스패처가 있는지 확인하고, 또한 가져옵니다. ObserveOnDispatcher
이는 평가되는 모든 스레드의을ObserveOn
사용해야한다는 점을 제외하면 .DispatcherScheduler
ObserveOnDispatcher
이 코드는 다음 출력을 제공합니다.
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
ObserveOn: OnNext(0) on Thread: 1
Timer: OnCompleted() on Thread: 2
ObserveOn: OnCompleted() on Thread: 1
디스패처 (및 메인 스레드)는 스레드 1 Timer
이며 여전히 선택한 스레드 (2)에서 호출 OnNext
하고 OnCompleted
있지만 ObserveOnDispatcher
is는 디스패처 스레드 인 스레드 (1)로 다시 호출합니다.
또한 디스패처 스레드 (예 : a Thread.Sleep
)를 ObserveOnDispatcher
차단 하면 would 차단 (이 코드는 LINQPad 기본 메서드 내에서 가장 잘 작동 함)을 볼 수 있습니다.
var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
Console.WriteLine("Blocking the dispatcher");
Thread.Sleep(2000);
Console.WriteLine("Unblocked");
다음과 같은 출력이 표시됩니다.
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Blocking the dispatcher
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2
Unblocked
ObserveOn: OnNext(0) on Thread: 1
ObserveOn: OnCompleted() on Thread: 1
호출을 통해 ObserveOnDispatcher
한 번만 나갈 수있는 Sleep
것은 실행되었습니다.
키 포인트
It's useful to keep in mind that Reactive Extensions is essentially a free-threaded library, and tries to be as lazy as possible about what thread it runs on - you have to deliberately interfere with ObserveOn
, SubscribeOn
and passing specific schedulers to operators that accept them to change this.
There's nothing a consumer of an observable can do to control what it's doing internally - ObserveOn
and SubscribeOn
are decorators that wrap the surface area of observers and observables to marshal calls across threads. Hopefully these examples have made that clear.
I found James's answer very clear and comprehensive. However, I despite this I still find myself having to explain the differences.
Therefore I created a very simple/stupid example that allows me to graphically demonstrate what schedulers things are being called on. I've created a class MyScheduler
that executes actions immediately, but will change the console colour.
The text output from the SubscribeOn
scheduler is output in red and that from ObserveOn
scheduler is output in blue.
using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
namespace SchedulerExample
{
class Program
{
static void Main(string[] args)
{
var mydata = new[] {"A", "B", "C", "D", "E"};
var observable = Observable.Create<string>(observer =>
{
Console.WriteLine("Observable.Create");
return mydata.ToObservable().
Subscribe(observer);
});
observable.
SubscribeOn(new MyScheduler(ConsoleColor.Red)).
ObserveOn(new MyScheduler(ConsoleColor.Blue)).
Subscribe(s => Console.WriteLine("OnNext {0}", s));
Console.ReadKey();
}
}
}
This outputs:
And for reference MyScheduler (not suitable for real usage):
using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
namespace SchedulerExample
{
class MyScheduler : IScheduler
{
private readonly ConsoleColor _colour;
public MyScheduler(ConsoleColor colour)
{
_colour = colour;
}
public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
return Execute(state, action);
}
private IDisposable Execute<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
var tmp = Console.ForegroundColor;
Console.ForegroundColor = _colour;
action(this, state);
Console.ForegroundColor = tmp;
return Disposable.Empty;
}
public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
throw new NotImplementedException();
}
public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
{
throw new NotImplementedException();
}
public DateTimeOffset Now
{
get { return DateTime.UtcNow; }
}
}
}
I often mistake that .SubcribeOn
is used to set thread where code inside .Subscribe
is being executed. But to remember, just think that publish and subscribe must be pair like yin-yang. To set where Subscribe's code
being executed use ObserveOn
. To set where Observable's code
executed used SubscribeOn
. Or in summary mantra: where-what
,Subscribe-Observe
,Observe-Subscribe
.
ReferenceURL : https://stackoverflow.com/questions/20451939/observeon-and-subscribeon-where-the-work-is-being-done
'code' 카테고리의 다른 글
대용량 (14GB) MySQL 덤프 파일을 새 MySQL 데이터베이스로 가져 오려면 어떻게해야합니까? (0) | 2021.01.09 |
---|---|
GDB를 프로세스에 연결하려고 할 때 "ptrace 작업이 허용되지 않음"을 해결하는 방법은 무엇입니까? (0) | 2021.01.09 |
WIFI가 연결되면 SSID 받기 (0) | 2021.01.09 |
Ctrl R이 SQL Server 2012에서 작동하지 않습니다. (0) | 2021.01.09 |
오류 StrictMode $ AndroidBlockGuardPolicy.onNetwork (0) | 2021.01.09 |