Post on 23-Jan-2018
Introduzione a ReactiveX
Andrea Ceroniandrea.ceroni@elfo.net
@andrekiba6
Sponsored by
28 aprile 2016
Relatore
Introduzione a ReactiveX
Tipo di sessione: Frontale
Durata sessione: 45min
Argomenti
Reactive Programming
observable stream
IObservable e IObserver
Pull model vs Push model
Cold vs Hot observables
ReactiveX
classe Observable
Operatori
1/18
Code Q & AIntro
2/18
3/18Cosa significa
Reactive Programming?
paradigma di programmazione dichiarativain cui l’applicazione reagisce ad eventi
non necessariamente concorrente
4/18What's the difference between an array and events?
Erik Meijer
[ 16, 2, 5, 6, 4, 11, 7 ] Where(x => IsPrime(x)) [ 2, 5, 11, 7 ]
timex =6y= 6
x =22y= 116
x =7y= 91
x =33y= 4
Where(e => e.X > 20)timex =22
y= 116x =33y= 4
5/18
observable = stream di eventi
è possibile creare facilmente data stream di qualsiasi cosa, non solo eventi
gli stream sono poco costosi in termini di risorse e possono stare ovunque
6/18Come recuperiamo i dati?
Single Multiple
Sync T GetData() IEnumerable<T> GetData()
Async Task<T> GetData() IObservable<T> GetData()
e se vogliamo processare in asincrono una collectionsenza attendere di avere tutti i dati?
e se non sappiamo a priori quando dovremo farlo?
7/18IEnumerable<T>Il metodo IEnumerable.GetEnumerator() ritorna un oggetto di tipo IEnumerator che ci permette di iterare su una collection e ha questi metodi e proprietà:
• bool MoveNext() : avanza all’elemento successivo e ritorna true o false (se esiste o no)• T Current{ get; } : ritorna l’elemento corrente• throws Exception : la chiamata a Current può generare eccezione se l’elemento non esiste• void Dispose() : rilascia le risorse utilizzate dall’enumerator
pull model
8/18esempiovar enumerator = new List<int> { 1, 2, 3 }.GetEnumerator();
while (enumerator.MoveNext()){
Console.WriteLine(enumerator.Current);}enumerator.Dispose();
9/18IObserver<T>Ogni metodo di IEnumerator ha il suo duale nell’interfaccia IObserver :
• void OnCompleted(): notifica l’osservatore che il provider ha finito di mandare dati• void OnNext(T) : rende disponibile un nuovo elemnto all’osservatore• void OnError(Exception) : notifica l’osservatore che c’è stato un errore
push model
x time
10/18esempiopublic static IObservable<int> GetData(){
return Observable.Create<int>(o => {o.OnNext(1);o.OnNext(2);o.OnNext(3);o.OnCompleted();return Disposable.Empty;
});}
11/18ConfrontoIEnumerable IObservable
pull push
bool MoveNext() void OnCompleted()
T Current { get; } void OnNext()
throws Exception void OnError(Exception)
12/18IObservable<T> e IObserver<T>
interface IObserver<in T> {
void OnNext(T item);void OnCompleted();void OnError(Exception error);
}
interface IObservable<out T> {
IDisposable Subscribe(IObserver<T> observer); }
13/18
Cold
• inizia ad emettere item solo quando viene sottoscritta• gli item emessi non sono condivisi tra gli osservatori
Hot
• emette item indipendentemente dall’esistenza di un osservatore• gli item sono condivisi tra tutti gli osservatori
Cold vs Hot observables
14/18ReactiveX
Rx is a library for programming withasynchronous data streams.
It is a combination of the best ideas fromthe Observer pattern, the Iterator pattern,
and functional programming
15/18
mette a disposizione la classe Observable che in una singola astrazione ci permettedi gestire qualsiasi stream
possiamo trattare stream di eventi come normali collection
componibile: le query utilizzano diversi operatori e si possono comporre, il flusso è chiaro
dichiarativa: specifica cosa fa il codice, non come lo fa
trasformativa: le query possono trasforamre dati da un tipo ad un altro
dal punto di vista dell’observer l’implementazione non importa à disaccoppiamento
Quali vantaggi?
16/18
Observable.Just(code)
17/18
Riassunto
che cosa è un’ observable
differenza tra push model e pull model
differenza tra cold e hot observables
wrappare un evento in una observable
come concatenare operatori Rx per modificare i dati
18/18questions.ToObservable().Subscribe(q => {
if(iKnowTheAnswer)Answer(q); J
});
Grazie!