I’m trying to do a lot of network operation in parallel, and I want to set a timeout to each operation.
Since Parallel.ForEach doesn’t have an easy timeout option, I’m using System.Reactive.
this is my code:
public void networkOps(List<MacCpe> source, Action<List<Router>, List<Exception>> onDone) {
var routers = new List<Router>();
var exceptions = new List<Exception>();
Observable.Defer(() => source.ToObservable())
.ObserveOn(Scheduler.CurrentThread)
.SubscribeOn(Scheduler.Default)
.SelectMany(it =>
Observable.Amb(
Observable.Start(() => {
switch(it.type) {
case AntennaType.type1: {
//network stuff
}
break;
case AntennaType.type2: {
//network stuff
}
break;
case AntennaType.type3: {
//network stuff
}
break;
case AntennaType.type4: {
//network stuff
}
break;
default: throw new NullReferenceException("Nothing");
}
}).Select(_ => true),
Observable.Timer(TimeSpan.FromSeconds(60)).Select(_ => false)
),
(it, result) => new { it, result }
)
.Subscribe (
x => {
Console.WriteLine("checked item number " + x.it.Id);
},
ex => {
Console.WriteLine("error string");
}, () => {
onDone(routers, exceptions);
}
);
}
I’m using the Observable.Amb operator to run in parallel a 60 seconds timer, that works as a timeout.
However when I run this method, the program exits immediately without ever getting to the callback onDone.
I see online that I can use ObserveOnDispatcher to observe on the Ui thread while running the blocking code on a pool of threads, but I’m using this on dotnet core on linux on a terminal application server side.
How would one go to observe on the “main thread” in a console application?
Thanks in advance for the responses.
Advertisement
Answer
As you are replacing Parallel.ForEach
it sounds like you are happy to have a blocking operation. Using Rx the way you have set it up it is not a blocking operation, so hence the method ends immediately.
It’s very simple to fix. Just change your .Subscribe
to this:
.Do(
x =>
{
Console.WriteLine("checked item number " + x.it.Id);
},
ex =>
{
Console.WriteLine("error string");
}, () =>
{
onDone(routers, exceptions);
}
)
.Wait();
I’d also get rid of your .ObserveOn(Scheduler.CurrentThread)
and .SubscribeOn(Scheduler.Default)
until you are certain that you need those.