I’m trying to do a lot of network operation in parallel, and I want to set a timeout to each operation.
Since Parallel.ForEach doesn’t have an easy timeout option, I’m using System.Reactive.
this is my code:
public void networkOps(List<MacCpe> source, Action<List<Router>, List<Exception>> onDone) { var routers = new List<Router>(); var exceptions = new List<Exception>(); Observable.Defer(() => source.ToObservable()) .ObserveOn(Scheduler.CurrentThread) .SubscribeOn(Scheduler.Default) .SelectMany(it => Observable.Amb( Observable.Start(() => { switch(it.type) { case AntennaType.type1: { //network stuff } break; case AntennaType.type2: { //network stuff } break; case AntennaType.type3: { //network stuff } break; case AntennaType.type4: { //network stuff } break; default: throw new NullReferenceException("Nothing"); } }).Select(_ => true), Observable.Timer(TimeSpan.FromSeconds(60)).Select(_ => false) ), (it, result) => new { it, result } ) .Subscribe ( x => { Console.WriteLine("checked item number " + x.it.Id); }, ex => { Console.WriteLine("error string"); }, () => { onDone(routers, exceptions); } ); }
I’m using the Observable.Amb operator to run in parallel a 60 seconds timer, that works as a timeout.
However when I run this method, the program exits immediately without ever getting to the callback onDone.
I see online that I can use ObserveOnDispatcher to observe on the Ui thread while running the blocking code on a pool of threads, but I’m using this on dotnet core on linux on a terminal application server side.
How would one go to observe on the “main thread” in a console application?
Thanks in advance for the responses.
Advertisement
Answer
As you are replacing Parallel.ForEach
it sounds like you are happy to have a blocking operation. Using Rx the way you have set it up it is not a blocking operation, so hence the method ends immediately.
It’s very simple to fix. Just change your .Subscribe
to this:
.Do( x => { Console.WriteLine("checked item number " + x.it.Id); }, ex => { Console.WriteLine("error string"); }, () => { onDone(routers, exceptions); } ) .Wait();
I’d also get rid of your .ObserveOn(Scheduler.CurrentThread)
and .SubscribeOn(Scheduler.Default)
until you are certain that you need those.