Skip to content
Advertisement

Reactive Extension .NetCore Observing on MainThread

I’m trying to do a lot of network operation in parallel, and I want to set a timeout to each operation.

Since Parallel.ForEach doesn’t have an easy timeout option, I’m using System.Reactive.

this is my code:

public void networkOps(List<MacCpe> source, Action<List<Router>, List<Exception>> onDone) {

var routers = new List<Router>();
var exceptions = new List<Exception>();

Observable.Defer(() => source.ToObservable()) 
    .ObserveOn(Scheduler.CurrentThread)
            .SubscribeOn(Scheduler.Default)
            .SelectMany(it => 
                Observable.Amb(
                            Observable.Start(() => {
                                switch(it.type) {
                                    case AntennaType.type1: {
                                        //network stuff
                                    }
                                    break;

                                    case AntennaType.type2: {
                                       //network stuff
                                    }
                                    break;

                                    case AntennaType.type3: {
                                       //network stuff
                                    }
                                    break;

                                    case AntennaType.type4: {
                                      //network stuff
                                    }
                                    break;

                                    default: throw new NullReferenceException("Nothing");
                                }
                            }).Select(_ => true),
                            Observable.Timer(TimeSpan.FromSeconds(60)).Select(_ => false)
                        ),
                        (it, result) => new { it, result }
                    )
                    .Subscribe (
                        x => {
                            Console.WriteLine("checked item number " + x.it.Id);
                        },
                        ex => {
                            Console.WriteLine("error string");
                        }, () => {
                            onDone(routers, exceptions);
                        }
                    );
}

I’m using the Observable.Amb operator to run in parallel a 60 seconds timer, that works as a timeout.

However when I run this method, the program exits immediately without ever getting to the callback onDone.

I see online that I can use ObserveOnDispatcher to observe on the Ui thread while running the blocking code on a pool of threads, but I’m using this on dotnet core on linux on a terminal application server side.

How would one go to observe on the “main thread” in a console application?

Thanks in advance for the responses.

Advertisement

Answer

As you are replacing Parallel.ForEach it sounds like you are happy to have a blocking operation. Using Rx the way you have set it up it is not a blocking operation, so hence the method ends immediately.

It’s very simple to fix. Just change your .Subscribe to this:

.Do(
    x =>
    {
        Console.WriteLine("checked item number " + x.it.Id);
    },
    ex =>
    {
        Console.WriteLine("error string");
    }, () =>
    {
        onDone(routers, exceptions);
    }
)
.Wait();

I’d also get rid of your .ObserveOn(Scheduler.CurrentThread) and .SubscribeOn(Scheduler.Default) until you are certain that you need those.

User contributions licensed under: CC BY-SA
10 People found this is helpful
Advertisement