# Notifiers
Reactive.Bindings.Notifiers
namespace provides many useful classes which implement IObservable
interface.
# BooleanNotifier
BooleanNotifier
class implements the IObservable<bool>
interface.
And has some methods and property.
TurnOn
method- Change state to true.
TurnOff
method- Change state to false.
SwitchValue
method- Switch state.
Value
property- Set state
The initial state can be set the constructor. The default value is false.
var n = new BooleanNotifier();
n.Subscribe(x => Debug.WriteLine(x));
n.TurnOn(); // true
n.TurnOff(); // false
n.Value = true; // true
n.Value = false; // false
It can use as source of ReactiveCommand
like below:
var n = new BooleanNotifier(); // the default value is false.
// CanExecute method of ReactiveCommand returns true as default. So, you set initialValue explicitly to `n.Value`.
var command = n.ToReactiveCommand(initialValue: n.Value);
// Or if you would like to convert to something using Select and others before calling ToReactiveCommand, you can use StartWith.
var command2 = n.StartWith(n.Value).Select(x => Something(x)).ToReactiveCommand();
# CountNotifier
CountNotifier
class implements the IObservable<CountChangedStates>
interface. It provides increment and decrement features, and raise a CountChangedStates
value when the state changes.
CountChangedStates enum is defined as below.
/// <summary>Event kind of CountNotifier.</summary>
public enum CountChangedStatus
{
/// <summary>Count incremented.</summary>
Increment,
/// <summary>Count decremented.</summary>
Decrement,
/// <summary>Count is zero.</summary>
Empty,
/// <summary>Count arrived max.</summary>
Max
}
CountNotifier
's max value can be set from constructor argument:
var c = new CountNotifier(); // default max value is int.MaxValue
// output status.
c.Subscribe(x => Debug.WriteLine(x));
// output current value.
c.Select(_ => c.Count).Subscribe(x => Debug.WriteLine(x));
// increment
var d = c.Increment(10);
// revert increment
d.Dispose();
// increment and decrement
c.Increment(10);
c.Decrement(5);
// output current value.
Debug.WriteLine(c.Count);
Output is below.
Increment
10
Decrement
0
Empty
0
Increment
10
Decrement
5
5
# ScheduledNotifier
This class raises the value on the scheduler. The default scheduler is Scheduler.Immediate
. Set the scheduler using constructor argument.
var n = new ScheduledNotifier<string>();
n.Subscribe(x => Debug.WriteLine(x));
// output the value immediately
n.Report("Hello world");
// output the value after 2 seconds.
n.Report("After 2 seconds.", TimeSpan.FromSeconds(2));
# BusyNotifier
This class implements the IObservable<bool>
interface.
It raises true
during running the process, raises false
when all processes end.
The StartProcess
method returns an IDisposable
instance. When the process finishes, call the Dispose method.
using Reactive.Bindings.Notifiers;
using System;
using System.Threading.Tasks;
namespace ReactivePropertyEduApp
{
class Program
{
static void Main(string[] args)
{
MainAsync(args).Wait();
}
static async Task MainAsync(string[] args)
{
var b = new BusyNotifier();
b.Subscribe(x => Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss")}: OnNext: {x}"));
await Task.WhenAll(
Task.Run(async () =>
{
using (b.ProcessStart())
{
Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss")}: Process1 started.");
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss")}: Process1 finished.");
}
}),
Task.Run(async () =>
{
using (b.ProcessStart())
{
Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss")}: Process2 started.");
await Task.Delay(TimeSpan.FromSeconds(2));
Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss")}: Process2 finished.");
}
}));
}
}
}
Output is below.
15:07:45: OnNext: False
15:07:45: OnNext: True
15:07:45: Process1 started.
15:07:45: Process2 started.
15:07:46: Process1 finished.
15:07:47: Process2 finished.
15:07:47: OnNext: False
# MessageBroker
I suggest creating a new notifier called MessageBroker
: an in-memory pubsub. This is an Rx and async friendly EventAggregator
or MessageBus
, etc. We can use this for the messenger pattern.
If reviewer accept this code, please add to all platforms.
using Reactive.Bindings.Notifiers;
using System;
using System.Reactive.Linq;
using System.Threading.Tasks;
public class MyClass
{
public int MyProperty { get; set; }
public override string ToString()
{
return "MP:" + MyProperty;
}
}
class Program
{
static void RunMessageBroker()
{
// global scope pub-sub messaging
MessageBroker.Default.Subscribe<MyClass>(x =>
{
Console.WriteLine("A:" + x);
});
var d = MessageBroker.Default.Subscribe<MyClass>(x =>
{
Console.WriteLine("B:" + x);
});
// support convert to IObservable<T>
MessageBroker.Default.ToObservable<MyClass>().Subscribe(x =>
{
Console.WriteLine("C:" + x);
});
MessageBroker.Default.Publish(new MyClass { MyProperty = 100 });
MessageBroker.Default.Publish(new MyClass { MyProperty = 200 });
MessageBroker.Default.Publish(new MyClass { MyProperty = 300 });
d.Dispose(); // unsubscribe
MessageBroker.Default.Publish(new MyClass { MyProperty = 400 });
}
static async Task RunAsyncMessageBroker()
{
// asynchronous message pub-sub
AsyncMessageBroker.Default.Subscribe<MyClass>(async x =>
{
Console.WriteLine("A:" + x);
await Task.Delay(TimeSpan.FromSeconds(1));
});
var d = AsyncMessageBroker.Default.Subscribe<MyClass>(async x =>
{
Console.WriteLine("B:" + x);
await Task.Delay(TimeSpan.FromSeconds(2));
});
// await all subscriber complete
await AsyncMessageBroker.Default.PublishAsync(new MyClass { MyProperty = 100 });
await AsyncMessageBroker.Default.PublishAsync(new MyClass { MyProperty = 200 });
await AsyncMessageBroker.Default.PublishAsync(new MyClass { MyProperty = 300 });
d.Dispose(); // unsubscribe
await AsyncMessageBroker.Default.PublishAsync(new MyClass { MyProperty = 400 });
}
static void Main(string[] args)
{
Console.WriteLine("MessageBroker");
RunMessageBroker();
Console.WriteLine("AsyncMessageBroker");
RunAsyncMessageBroker().Wait();
}
}
Messenger pattern's multi thread dispatch can be handled easily by Rx.
MessageBroker.Default.ToObservable<MyClass>()
.ObserveOn(Dispatcher) // Rx Magic!
.Subscribe(x =>
{
Console.WriteLine(x);
});