Comunicazione nei Sistemi Distribuiti

37
Comunicazione nei Sistemi Distribuiti Parte 2 Università degli Studi di Roma Tor VergataDipartimento di Ingegneria Civile e Ingegneria Informatica Corso di Sistemi Distribuiti e Cloud Computing A.A. 2017/18 Valeria Cardellini Comunicazione orientata ai messaggi RPC migliora la trasparenza della distribuzione Ma non è un meccanismo sempre adatto a supportare la comunicazione in un SD Ad es. quando non si può essere certi che il destinatario sia in esecuzione Alternativa: comunicazione orientata ai messaggi Di tipo transiente Berkeley socket: già esaminata in altri corsi Message Passing Interface (MPI) Di tipo persistente Message Oriented Middleware (MOM) Valeria Cardellini - SDCC 2017/18 1

Transcript of Comunicazione nei Sistemi Distribuiti

Page 1: Comunicazione nei Sistemi Distribuiti

Comunicazione nei Sistemi Distribuiti Parte 2

Università degli Studi di Roma “Tor Vergata” Dipartimento di Ingegneria Civile e Ingegneria Informatica

Corso di Sistemi Distribuiti e Cloud Computing A.A. 2017/18

Valeria Cardellini

Comunicazione orientata ai messaggi

•  RPC migliora la trasparenza della distribuzione •  Ma non è un meccanismo sempre adatto a

supportare la comunicazione in un SD –  Ad es. quando non si può essere certi che il destinatario sia

in esecuzione •  Alternativa: comunicazione orientata ai messaggi

–  Di tipo transiente •  Berkeley socket: già esaminata in altri corsi •  Message Passing Interface (MPI)

–  Di tipo persistente •  Message Oriented Middleware (MOM)

Valeria Cardellini - SDCC 2017/18

1

Page 2: Comunicazione nei Sistemi Distribuiti

Message Passing Interface (MPI)

•  Libreria per lo scambio di messaggi tra processi in esecuzione su nodi diversi –  Specifica della sola interfaccia (http://www.mpi-forum.org/) –  Diverse implementazioni, tra cui Open MPI (http://

www.open-mpi.org/) e MPICH (http://www.mcs.anl.gov/research/projects/mpich2/)

–  Standard de facto per la comunicazione tra i nodi di un sistema che esegue un programma parallelo sviluppato per un’architettura a memoria distribuita

•  MPI definisce una serie di primitive per la comunicazione tra processi; in particolare: –  Primitive per la comunicazione punto-punto: per l’invio e la

ricezione di un messaggio tra due processi diversi –  Primitive per la comunicazione collettiva

Valeria Cardellini - SDCC 2017/18

2

Comunicazione punto-punto in MPI •  Principali primitive per la comunicazione punto-punto:

–  MPI_Send e MPI_Recv: comunicazione bloccante •  MPI_Send con modalità sincrona o bufferizzata a seconda

dell’implementazione –  MPI_Bsend: invio bloccante bufferizzato –  MPI_Ssend: invio sincrono bloccante –  MPI_Isend e MPI_Irecv: comunicazione non bloccante

Primitive MPI Significato MPI_Bsend Aggiunge il messaggio in uscita ad un buffer per l’invio MPI_Send Invia il messaggio e aspetta finché non viene copiato in un

buffer locale o remoto MPI_Ssend Invia il messaggio e aspetta finché non inizia la ricezione MPI_Isend Invia il riferimento al messaggio in uscita e continua MPI_Recv Riceve il messaggio; si blocca se non ce ne sono

Valeria Cardellini - SDCC 2017/18

3

Page 3: Comunicazione nei Sistemi Distribuiti

Esempio di comunicazione in MPI #include <stdio.h> #include <string.h> #include <mpi.h> int main (int argc, char **argv) { int myrank; char message[20]; MPI_Status status; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &myrank); printf("Il mio rank e' : %d\n", myrank); if (myrank == 0) { //Invia un messaggio al processo 1 strcpy(message, "PROVA"); MPI_Send(message, strlen(message)+1, MPI_CHAR, 1, 99, MPI_COMM_WORLD); printf("%d) Ho inviato: '%s'\n", myrank, message); } else if (myrank==1) { //Riceve il messaggio dal processo 0 MPI_Recv(message, 20, MPI_CHAR, 0, 99, MPI_COMM_WORLD, &status); printf("%d) Ho ricevuto: '%s'\n", myrank, message); } MPI_Finalize(); return 0; }

MPI_Send(buf, count, datatype, dest, tag, comm)

MPI_Recv(buf, count, datatype, source, tag, comm, status)

Valeria Cardellini - SDCC 2017/18 4

Message-oriented middleware •  Communication middleware that supports sending

and receiving messages in a persistent way •  Loose coupling among system/application

components –  Decoupling in time and space –  Can also support synchronization decoupling

•  Two patterns: –  Message queue system (MQS) –  Publish-subscribe system (pub/sub)

Valeria Cardellini - SDCC 2017/18 5

Page 4: Comunicazione nei Sistemi Distribuiti

Queue message pattern

•  Message sent to queue •  Multiple consumers can read from the queue •  Each message is delivered to only one consumer •  Principles

–  Loose coupling –  Service statelessness

•  Services minimize resource consumption by deferring the management of state information when necessary

•  Apps: –  Task scheduling, load balancing, collaboration

Valeria Cardellini - SABD 2016/17

6

Queue message pattern

Valeria Cardellini - SABD 2016/17

7

A sends a message to B B issues a response message back to A

Page 5: Comunicazione nei Sistemi Distribuiti

Message queue API •  Basic interface to a queue in a MQS:

–  put: nonblocking send •  Append a message to a specified queue

–  get: blocking receive •  Block until the specified queue is nonempty and remove the

first message •  Variations: allow searching for a specific message in the

queue, e.g., using a matching pattern

–  poll: nonblocking receive •  Check a specified queue for message and remove the first •  Never block

–  notify: nonblocking receive •  Install a handler (callback function) to be automatically

called when a message is put into the specified queue 8 Valeria Cardellini – SABD 2016/17

Publish/subscribe pattern

Valeria Cardellini - SABD 2016/17

9

•  Application components can publish asynchronous messages (e.g., event notifications), and/or declare their interest in message topics by issuing a subscription

Page 6: Comunicazione nei Sistemi Distribuiti

Publish/subscribe pattern

Valeria Cardellini - SABD 2016/17

10

•  Multiple consumers can subscribe to topic with or without filters

•  Subscriptions are collected by an event dispatcher component, responsible for routing events to all matching subscribers –  For scalability reasons, its implementation can be distributed

•  High degree of decoupling among components –  Easy to add and remove components –  Appropriate for dynamic environments

Publish/subscribe pattern

•  A sibling of message queue pattern but further generalizes it by delivering a message to multiple consumers –  Message queue: delivers messages to only one receiver,

i.e., one-to-one communication –  Pub/sub channel: delivers messages to multiple receivers,

i.e., one-to-many communication

11 Valeria Cardellini - SABD 2016/17

Page 7: Comunicazione nei Sistemi Distribuiti

Publish/subscribe API •  Calls that capture the core of any pub/sub system:

–  publish(event): to publish an event •  Events can be of any data type supported by the given

implementation languages and may also contain meta-data –  subscribe(filter expr, notify_cb, expiry) → sub handle: to

subscribe to an event •  Takes a filter expression, a reference to a notify callback for

event delivery, and an expiry time for the subscription registration.

•  Returns a subscription handle –  unsubscribe(sub handle) –  notify_cb(sub_handle, event): called by the pub/sub system to

deliver a matching event

12 Valeria Cardellini – SABD 2016/17

Architettura di MOM •  I messaggi sono inseriti/letti in/da code

–  Inserimento in coda sorgente e lettura da coda di destinazione

–  La coda appare locale sia al mittente che al destinatario (trasparenza della distribuzione)

–  La coda memorizza il messaggio finché non viene prelevato –  Generalmente coda privata per ogni applicazione (esistono

anche code condivise da più applicazioni) –  Il MOM si occupa del trasferimento dei messaggi

•  Il MOM mantiene la corrispondenza tra ogni coda e la sua posizione sulla rete (servizio di naming)

Valeria Cardellini - SDCC 2017/18

13

Page 8: Comunicazione nei Sistemi Distribuiti

MOM functionalities •  MOM handles the complexity of addressing,

routing, availability of communication partners, and message format transformations

Source: “Cloud Computing Patterns”, http://bit.ly/2hZv6Xs Valeria Cardellini - SDCC 2017/18

14

Semantics delivery in MOM

•  At-least-once delivery –  How can MOM ensure that messages are received

successfully? –  By sending ack for each retrieved message and resending

message if message is not received –  Be careful: app should be tolerant to message duplications

Valeria Cardellini - SDCC 2017/18

15

Page 9: Comunicazione nei Sistemi Distribuiti

Semantics delivery in MOM •  Exactly-once delivery

–  How can MOM ensure that a message is delivered only exactly once to a receiver?

–  By filtering possible message duplicates automatically –  Upon creation, each message is associated with a unique

message ID, which is used to filter message duplicates during their traversal from sender to receiver

–  Messages must also survive MOM components’ crashes

Valeria Cardellini - SDCC 2017/18

16

Semantics delivery in MOM

•  Transaction-based delivery –  How can MOM ensure that messages are only deleted from

a message queue if they have been received successfully? –  MOM and the receiver participate in a transaction: all

operations involved in the reception of a message are performed under one transactional context guaranteeing ACID behavior

Valeria Cardellini - SDCC 2017/18

17

Page 10: Comunicazione nei Sistemi Distribuiti

Semantics delivery in MOM

•  Timeout-based delivery –  How can MOM ensure that messages are only deleted from

a message queue if they have been received successfully at least once?

–  Messages are not deleted immediately from the queue, but marked as being invisible

–  Invisible message cannot be read by another client –  After client ack of message receipt, the message is deleted

from the queue

Valeria Cardellini - SDCC 2017/18

18

Routing nei MOM

•  Il MOM realizza un overlay network –  Occorre un servizio di routing

•  Una sottorete di router conosce la topologia dell’overlay network (spesso statica) e si occupa di far pervenire il messaggio dalla coda del mittente a quella del destinatario

•  Topologie di rete dinamiche richiedono la gestione dinamica delle corrispondenze coda-posizione di rete (analogia con routing in reti IP)

•  L’architettura di un MOM scalabile richiede un insieme di gestori (router) specializzati nel servizio di routing dei messaggi

Valeria Cardellini - SDCC 2017/18

19

Page 11: Comunicazione nei Sistemi Distribuiti

Message broker •  MOM spesso usati per realizzare l’Enterprise

Application Integration (EAI) –  Applicazioni eterogenee devono interpretare il formato dei

messaggi (struttura e rappresentazione dei dati) per poter comunicare

•  Soluzioni per gestire l’eterogeneità dei messaggi (3 su 4 già esaminate): –  Codifica del formato contenuta nel messaggio –  Ogni destinatario è in grado di comprendere ogni formato –  Formato comune dei messaggi –  Gateway di livello applicativo (message broker) in grado di

effettuare le conversioni tra formati diversi

•  Analizziamo la soluzione basata su message broker, spesso adottata nei MOM

Valeria Cardellini - SDCC 2017/18

20

Message broker (2) •  Gestisce l’eterogeneità delle applicazioni

–  Trasforma i messaggi in ingresso nel formato adatto all’applicazione destinataria, fornendo trasparenza di accesso

–  Aggrega o decompone messaggi –  Gestisce un repository di regole e programmi che

consentono la conversione dei messaggi –  Per motivi di scalabilità ed affidabilità, la sua

implementazione può essere distribuita

Valeria Cardellini - SDCC 2017/18

21

Page 12: Comunicazione nei Sistemi Distribuiti

MOM frameworks

•  Examples of MOM middleware –  IBM MQ –  Microsoft Message Queueing (MSMQ) –  Java Message Service (JMS): API MOM for Java –  Open MQ –  RabbitMQ –  Apache ActiveMQ http://activemq.apache.org –  Apache Kafka

•  Also Cloud-based products –  Amazon Simple Queue Service (SQS) –  Google Cloud Pub/Sub

•  Not always a clear distinction between queue message and pub/sub patterns –  Some frameworks (e.g., RabbitMQ, Kafka, NATS) support both –  Others not (e.g., redis is only pub/sub)

Valeria Cardellini - SDCC 2017/18

22

Protocols for MOM •  Also open standard protocols for message queues

–  AMQP (Advanced Message Queueing Protocol) •  https://www.amqp.org •  Binary protocol

–  STOMP (Simple (or Streaming) Text Oriented Messaging Protocol

•  http://stomp.github.io •  Text-based protocol

–  MQTT (Message Queue Telemetry Transport) •  http://mqtt.org •  Binary protocol

•  Goals: –  Platform- and vendor-agnostic –  Provide interoperability between different MOMs

Valeria Cardellini - SDCC 2017/18

23

Page 13: Comunicazione nei Sistemi Distribuiti

Some examples of MOM usage 1.  Accept and forward messages which

are sent by a producer and received by a consumer

2.  Distribute time-consuming tasks among multiple workers

3.  Deliver messages to many consumers at once (pub/sub pattern)

4.  Receive messages selectively

5.  Run a function on a remote node and wait for the result

Valeria Cardellini - SDCC 2017/18

24

Source: RabbitMQ tutorial http://bit.ly/2zPPMJO

IBM MQ •  MOM molto diffuso con architettura distribuita

–  “Integra dati e applicazioni in ambienti cloud, mobile, IoT e on-premise”

•  Messaggi e code gestiti da queue manager (QM) –  Le applicazioni possono inserire/estrarre messaggi solo

nelle/dalle code locali o attraverso un meccanismo RPC •  Trasferimento dei messaggi da una coda all’altra di

QM diversi tramite canali di comunicazione unidirezionali ed affidabili gestiti da message channel agent (MCA) che si occupano di: –  Stabilire canali di comunicazione –  Tipo dei messaggi –  Invio/ricezione dei messaggi

•  Ogni MCA ha una coda di invio

Valeria Cardellini - SDCC 2017/18

25

Page 14: Comunicazione nei Sistemi Distribuiti

IBM MQ (2) •  Il trasferimento sul canale può avvenire solo se

entrambi gli MCA sono attivi –  MQ fornisce meccanismi per avviare automaticamente un

MCA

•  Il routing è statico e poco flessibile –  Durante la fase di configurazione l’amministratore definisce le

opportune interconnessioni tra QM in tabelle di routing

Valeria Cardellini - SDCC 2017/18

26

IBM MQ (3)

•  Per favorire l’integrazione di applicazioni, MQ Broker agisce sui messaggi –  Trasformando formati –  Attuando il content-based routing dei messaggi –  Lavorando su informazioni di applicazione, per specificare

sequenze di azioni

Valeria Cardellini - SDCC 2017/18

27

Page 15: Comunicazione nei Sistemi Distribuiti

Amazon Simple Queue Service (SQS) •  Cloud-based message queue service based on polling

model –  Goal: to decouple the components of cloud applications –  Message queues are hosted within AWS infrastructure –  Messages are stored in queues for a limited period of time

•  Application components using SQS can run independently and asynchronously and be developed with different technologies

•  Provides timeout-based delivery –  Messages are only deleted from a message queue if they have

been received properly –  A received message is locked during processing (visibility

timeout); if processing fails, the lock expires and the message is available again

•  Can be combined with Amazon SNS –  To push a message to multiple SQS queues in parallel Va

leria

Car

delli

ni -

SD

CC

201

7/18

28

Amazon SQS: API •  CreateQueue, ListQueues, DeleteQueue

–  Create, list, delete queues

•  SendMessage, ReceiveMessage –  Add/receive messages to/from a specified queue (message

size up to 256 KB)

•  DeleteMessage –  Remove a received message from a specified queue (the

component must delete the message after receiving and processing it)

•  ChangeMessageVisibility –  Change the visibility timeout of a specified message in a

queue (when received, the message remains in the queue upon it is deleted explicitly by the receiver)

•  SetQueueAttributes, GetQueueAttributes –  Control queue settings, get information about a queue

Valeria Cardellini - SDCC 2017/18

29

Page 16: Comunicazione nei Sistemi Distribuiti

Amazon SQS: example

Valeria Cardellini - SDCC 2017/18

•  Example of application using SQS: online photo processing service http://bit.ly/2gwJFBw

30

Apache Kafka •  General-purpose, distributed pub/sub system

–  Allows to implement either message queue or pub/sub pattern

•  Originally developed in 2010 by LinkedIn to support real-time analytics

•  Written in Scala •  Horizontally scalable •  Fault-tolerant •  Semantics delivery

–  At-least-once –  In 2017, also exactly-once

Source: “Kafka: A Distributed Messaging System for Log Processing”, 2011

31 Valeria Cardellini - SDCC 2017/18

Page 17: Comunicazione nei Sistemi Distribuiti

Kafka at a glance

•  Kafka maintains feeds of messages in categories called topics

•  Producers: publish messages to a Kafka topic •  Consumers: subscribe to topics and process the feed of

published message •  Kafka cluster: distributed log of data over servers known

as brokers –  Brokers rely on Apache Zookeeper for coordination

32 Valeria Cardellini - SDCC 2017/18

Kafka: topics •  Topic: a category to which the message is published •  For each topic, Kafka cluster maintains a partitioned log

–  Log: append-only, totally-ordered sequence of records ordered by time

•  Topics are split into a pre-defined number of partitions •  Each partition is replicated with some replication factor

Valeria Cardellini - SDCC 2017/18

33

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test!

•  CLI command to create a topic

Page 18: Comunicazione nei Sistemi Distribuiti

Kafka: partitions

Valeria Cardellini - SDCC 2017/18

34

Kafka: partitions

•  Each partition is an ordered, numbered, immutable sequence of records that is continually appended to –  Like a commit log

•  Each record is associated with a sequence ID number called offset

•  Partitions are distributed across brokers •  Each partition is replicated for fault tolerance

Valeria Cardellini - SDCC 2017/18

35

Page 19: Comunicazione nei Sistemi Distribuiti

Kafka: partitions

•  Each partition is replicated across a configurable number of brokers

•  Each partition has one leader broker and 0 or more followers

•  The leader handles read and write requests –  Read from leader –  Write to leader

•  A follower replicates the leader and acts as a backup •  Each broker is a leader for some of it partitions and a

follower for others to load balance •  ZooKeeper is used to keep the brokers consistent

Valeria Cardellini - SDCC 2017/18

36

Kafka: producers •  Publish data to topics of their choice •  Also responsible for choosing which record to assign

to which partition within the topic –  Round-robin or partitioned by keys

•  Producers = data sources

Valeria Cardellini - SDCC 2017/18

37

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test!This is a message!This is another message!

•  Run the producer

Page 20: Comunicazione nei Sistemi Distribuiti

Kafka: consumers Va

leria

Car

delli

ni -

SD

CC

201

7/18

38

•  Consumer Group: set of consumers sharing a common group ID –  A Consumer Group maps to a logical subscriber –  Each group consists of multiple consumers for scalability and fault

tolerance •  Consumers use the offset to track which messages have been

consumed –  Messages can be replayed using the offset

•  Run the consumer > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning!

Kafka: ZooKeeper •  Kafka uses ZooKeeper to coordinate between the

producers, consumers and brokers •  ZooKeeper stores metadata

–  List of brokers –  List of consumers and their offsets –  List of producers

•  ZooKeeper runs several algorithms for coordination between consumers and brokers –  Consumer registration algorithm –  Consumer rebalancing algorithm

•  Allows all the consumers in a group to come into consensus on which consumer is consuming which partitions

Valeria Cardellini - SDCC 2017/18

39

Page 21: Comunicazione nei Sistemi Distribuiti

Kafka APIs •  Four core APIs •  Producer API: allows app to

publish streams of records to one or more Kafka topics

•  Consumer API: allows app to subscribe to one or more topics and process the stream of records produced to them

•  Connector API: allows building and running reusable producers or consumers that

Valeria Cardellini - SDCC 2017/18

40

connect Kafka topics to existing applications or data systems so to move large collections of data into and out of Kafka

Kafka APIs •  Streams API: allows app to

act as a stream processor, transforming an input stream from one or more topics to an output stream to one or more output topics

Valeria Cardellini - SDCC 2017/18

41

Page 22: Comunicazione nei Sistemi Distribuiti

Client library

•  JVM internal client •  Plus rich ecosystem of clients, among which:

–  Sarama: Go library for Kafka https://shopify.github.io/sarama/

–  Python library for Kafka https://github.com/confluentinc/confluent-kafka-python/

-  NodeJS client https://github.com/Blizzard/node-rdkafka

Valeria Cardellini - SDCC 2017/18

42

Apache Kafka within Monasca •  Monasca is a monitoring-as-a-service solution

integrated with OpenStack •  Uses Kafka as message queue system

Valeria Cardellini - SDCC 2017/18

43

Page 23: Comunicazione nei Sistemi Distribuiti

Comunicazione multicast •  Comunicazione multicast: schema di comunicazione

in cui i dati sono inviati a molteplici destinatari –  Comunicazione broadcast: caso particolare della multicast,

in cui i dati sono spediti a tutti i destinatari connessi in rete –  Esempi di applicazioni multicast one-to-many: distribuzione

di risorse audio/video, distribuzione di file –  Esempi di applicazioni multicast many-to-many: servizi di

conferenza, giochi multiplayer, simulazioni distribuite interattive

Unicast di un video a 1000 utenti Multicast di un video a 1000 utenti

Valeria Cardellini - SDCC 2017/18

44

Tipologie di multicast

•  Multicast a livello di rete

•  Multicast a livello applicativo

Valeria Cardellini - SDCC 2017/18

45

Page 24: Comunicazione nei Sistemi Distribuiti

Multicasting a livello di rete •  Replicazione dei pacchetti e routing gestiti dai router •  Multicast a livello IP (IPMC) basato sui gruppi

–  Generalizza UDP con trasmissione uno-a-molti –  Gruppo: insieme di host interessati alla stessa applicazione

multicast, identificati da uno stesso indirizzo IP •  Indirizzo IP da 224.0.0.0 a 239.255.255.255 assegnato al

gruppo –  Protocollo IGMP (Internet Group Management Protocol) per

il join al gruppo •  Uso limitato per:

–  Mancanza di uno sviluppo su larga scala (solo circa 5% degli AS supporta il multicast)

–  Problema di tener traccia dell’appartenenza ad un gruppo –  Ad es. disabilitato in tutte le piattaforme Cloud a causa del

problema del broadcast storm (aumento esponenziale del traffico di rete con possibile saturazione)

Valeria Cardellini - SDCC 2017/18

46

Multicasting applicativo

•  La replicazione dei pacchetti e il routing sono gestiti dagli end host

•  Multicasting applicativo di tipo –  Strutturato

•  Creazione di percorsi di comunicazione espliciti –  Non strutturato:

•  Basato su flooding (già esaminato) •  Basato su gossiping

Valeria Cardellini - SDCC 2017/18

47

Page 25: Comunicazione nei Sistemi Distribuiti

Multicasting applicativo strutturato •  Idea di base:

–  Organizzare i nodi in una rete overlay –  Usare la rete overlay per diffondere le

informazioni

•  Come costruire la rete overlay? –  Albero

•  Unico percorso tra ogni coppia di nodi –  Mesh (rete a maglia)

•  Molti percorsi tra ogni coppia di nodi

Valeria Cardellini - SDCC 2017/18

48

Multicasting applicativo tree-based •  Esempio: costruzione di un albero per il multicasting

applicativo in Scribe –  Scribe: sistema pub-sub decentralizzato basato su Pastry –  Pastry: rete P2P strutturata basata su DHT già esaminata

1.  Il nodo che inizia la sessione multicast genera l’identificatore, detto groupId, del gruppo di multicast

2. Cerca (tramite Pastry) il nodo responsabile per groupId 3. Tale nodo diventa la radice dell’albero di multicast 4. Se il nodo P vuole unirsi all’albero di multicast identificato da

groupId invia una richiesta di JOIN 5. Quando la richiesta di JOIN arriva al nodo Q

•  Q non ha mai ricevuto una richiesta di JOIN per groupId ⇒ Q diventa forwarder, P diventa figlio di Q e Q inoltra la richiesta di JOIN verso la radice

•  oppure Q è già un forwarder per groupId ⇒ P diventa figlio di Q; non occorre inoltrare la richiesta di JOIN alla radice

Riferimento: M. Castro et al., “SCRIBE: A large-scale and decentralised application-level multicast infrastructure”, IEEE JSAC, Oct. 2002.

Valeria Cardellini - SDCC 2017/18

49

Page 26: Comunicazione nei Sistemi Distribuiti

Multicasting applicativo tree-based (2)

radice

join()

forwarder forwarder

radice

join()

forwarder

forwarder

radice

join()

forwarder

forwarder

forwarder

Valeria Cardellini - SDCC 2017/18

50

Costi del multicasting applicativo tree-based

•  Stress sui collegamenti: quante volte un messaggio di multicasting applicativo attraversa lo stesso collegamento fisico? –  Esempio: il messaggio da A a D attraversa <Ra,Rb> due volte

•  Stretch: rapporto tra il tempo di trasferimento nell’overlay network e quello nella rete sottostante –  Esempio: i messaggi da B a C seguono un percorso con costo

71 a livello applicativo, ma 47 a livello di rete ⇒ stretch=71/47

Valeria Cardellini - SDCC 2017/18

51

Page 27: Comunicazione nei Sistemi Distribuiti

Protocolli basati su gossiping

•  Protocolli di tipo probabilistico, detti anche di gossiping o epidemici –  Essendo basati sulla teoria del gossip nelle reti sociali o della

diffusione delle epidemie

•  Permettono la rapida diffusione delle informazioni in reti a larghissima scala attraverso la scelta casuale dei destinatari successivi tra quelli noti al mittente –  Ogni nodo invia il messaggio ad un sottoinsieme, scelto

casualmente, di nodi nella rete –  Ogni nodo che lo riceve ne rinvierà una copia ad un altro

sottoinsieme, anch’esso scelto casualmente, e così via

Valeria Cardellini - SDCC 2017/18

52

Le origini •  Protocolli di gossiping definiti nel 1987 da Demers et al.

in un lavoro sulla garanzia di consistenza in database replicati su centinaia di server

•  Idea di base: assumendo che non vi siano conflitti di scrittura (ovvero aggiornamenti indipendenti) –  Le operazioni di aggiornamento sono eseguite inizialmente su

una o alcune repliche –  Una replica comunica il suo stato aggiornato ad un numero

limitato di vicini –  La propagazione dell’aggiornamento è lazy (non immediata) –  Al termine, ogni aggiornamento dovrebbe raggiungere tutte le

repliche

Riferimento: A. Demers et al., “Epidemic Algorithms for Replicated Database Maintenance”, Proc. 6th Symp. on Principles of Distributed Computing, pp. 1-12, Aug. 1987. ACM.

Valeria Cardellini - SDCC 2017/18

53

Page 28: Comunicazione nei Sistemi Distribuiti

Why gossiping in large scale DSs?

•  Several attractive properties of gossip-based information dissemination for large scale distributed systems –  Simplicity of gossiping algorithms –  Lack of centralized control and bottlenecks –  Scalability: each peer sends only a limited number

of messages, independently from the overall size of the system

–  Reliability and robustness: thanks to message redundancy

Valeria Cardellini - SDCC 2017/18

54

Where gossiping is used today?

•  Some examples: –  “Amazon uses a gossip protocol to quickly spread

information throughout the S3 system. This allows Amazon S3 to quickly route around failed or unreachable servers, among other things.” http://amzn.to/1MgDVsl

–  Amazon’s Dynamo uses a gossip-based failure detection service

–  The basic information exchange in BitTorrent is based on gossip

Valeria Cardellini - SDCC 2017/18

55

Page 29: Comunicazione nei Sistemi Distribuiti

Principi dei protocolli di gossiping

•  Principi di funzionamento: gossiping puro e anti-entropia

•  Gossiping puro: un peer che è stato appeno aggiornato (ossia infettato) contatta un certo numero di peer scelti casualmente inviandogli il proprio aggiornamento (infettandoli a loro volta)

•  Anti-entropia: modello di propagazione in cui ciascun

peer sceglie a caso un altro peer e si scambiano gli aggiornamenti, giungendo al termine ad uno stato simile su entrambi i peer

Valeria Cardellini - SDCC 2017/18

56

Gossiping puro •  Un peer P che è stato appena aggiornato, contatta un

peer Q scelto a caso •  Se Q ha già ricevuto l’aggiornamento (è già infetto), P

perde interesse a diffondere il gossip e con probabilità pari a peer 1/k smette di contattare altri nodi

•  Se s è la frazione di peer non ancora aggiornati, si dimostra che s = e−(k+1)(1−s)

•  Per garantire che un ampio numero di peer sia aggiornato, occorre combinare il gossiping puro con l’anti-entropia

Al crescere di k aumenta la probabilità che l’aggiornamento si diffonda

Valeria Cardellini - SDCC 2017/18

57

Page 30: Comunicazione nei Sistemi Distribuiti

Anti-entropia •  Obiettivo: aumentare la similarità tra peer,

aumentando così “l’ordine” (il motivo del nome!) •  Un peer P sceglie casualmente un altro peer Q nel

sistema; come lo aggiorna? •  Tre strategie di aggiornamento:

–  push: P invia soltanto i suoi aggiornamenti a Q –  pull: P si prende soltanto i nuovi aggiornamenti da Q –  push-pull: P e Q si scambiano reciprocamente gli

aggiornamenti (dopodiché possiedono le stesse informazioni)

•  Osservazione: la strategia push-pull è la più veloce ed impiega O(log N) round per propagare un aggiornamento agli N peer del sistema

scelta

dati scelta

dati scelta

dati

Valeria Cardellini - SDCC 2017/18

–  Round (o ciclo) di gossip: intervallo di tempo in cui ogni peer ha preso almeno una volta l’iniziativa di scambiare aggiornamenti

58

Un protocollo di gossiping in generale •  Due peer P e Q, con P che ha scelto Q per lo scambio

di dati; P è eseguito una volta ad ogni round (ogni Δ unità di tempo)

Active thread (peer P): Passive thread (peer Q): (1) selectPeer(&Q); (1) (2) selectToSend(&bufs); (2) (3) sendTo(Q, bufs); -----> (3) receiveFromAny(&P, &bufr); (4) (4) selectToSend(&bufs); (5) receiveFrom(Q, &bufr); <----- (5) sendTo(P, bufs); (6) selectToKeep(cache, bufr); (6) selectToKeep(cache, bufr); (7) processData(cache); (7) processData(cache) •  Quali sono gli aspetti cruciali?

–  La selezione dei peer –  La selezione dei dati scambiati –  Il processamento dei dati ricevuti

Riferimento: A.-M. Kermarrec, M. van Steen, “Gossiping in Distributed Systems”, ACM Operating System Review 41(5), Oct. 2007.

Valeria Cardellini - SDCC 2017/18

59

Page 31: Comunicazione nei Sistemi Distribuiti

Implementare un protocollo di gossiping Quali problemi specifici occorre affrontare per implementare un protocollo di gossiping?

•  Membership: come i peer possono conoscersi tra loro e quanti conoscenti occorre avere

•  Consapevolezza della rete: come far sì che i collegamenti fra peer riflettano la topologia della rete, in modo da ottenere prestazioni soddisfacenti

•  Gestione dei buffer: quali informazioni scartare quando la memoria è piena

•  Filtraggio dei messaggi: come considerare il reale interesse da parte dei peer e ridurre la probabilità che ricevano informazioni a cui non sono interessati

Valeria Cardellini - SDCC 2017/18

60

Gossiping e flooding a confronto •  La diffusione dell’informazione è l’applicazione

classica e più popolare del gossiping nei SD –  Valida alternativa rispetto al flooding

•  Nel caso di flooding –  Ogni peer che riceve il messaggio lo invia a tutti i suoi vicini

(possiamo considerarlo una degenerazione del gossiping) –  Il messaggio viene scartato quando il suo TTL diviene nullo

Round 1 Round 2 Round 3

Messaggi inviati: 18 Peer raggiunti: 8 su 9 Valeria Cardellini - SDCC 2017/18

61

Page 32: Comunicazione nei Sistemi Distribuiti

Gossiping e flooding a confronto (2) •  Nel caso di gossiping semplice

–  Il messaggio viene inviato con una probabilità di gossiping p for each msg m

if random(0,1) < p then send m

p

p

p

p

p

p p

p

p p

p Round 1 Round 2 Round 3

Messaggi inviati: 11 Peer raggiunti: 7 su 9

Valeria Cardellini - SDCC 2017/18

62

Gossiping vs flooding •  Gossiping features

–  Probabilistic –  Takes a localized decision but results in a global state –  Lightweight –  Fault tolerant

•  Flooding has advantages –  Universal coverage and minimal state information

•  … but it floods the networks with redundant messages •  Gossiping goals

–  Reduce the number of redundant transmissions that occur with flooding while trying to retain its advantages

–  … but due to its probabilistic nature, gossiping cannot guarantee that all the peers are reached and it requires more time to complete than flooding

Valeria Cardellini - SDCC 2017/18

63

Page 33: Comunicazione nei Sistemi Distribuiti

Altre applicazioni del gossiping nei SD

•  Oltre alla diffusione dell’informazione…

•  Peer sampling –  Per fornire a ciascun peer una lista di peer da contattare

•  Monitoraggio di risorse in sistemi distribuiti a larga scala

•  Computazioni distribuite per l’aggregazione di dati, in particolare in reti di sensori –  Computazione di valori aggregati (ad es. somma, media,

massimo, quantili) –  Ad es. nel caso di calcolo della media

•  Siano x0,i e x0,j i valori al tempo t=0 posseduti dai nodi i e j •  Dopo il gossiping tra i e j usando strategia push-pull:

x1,i, x1,j ←(x0,i + x0,j)/2 Valeria Cardellini - SDCC 2017/18

64

Two gossiping protocols

•  We now examine two examples of gossiping protocols –  Blind counter rumor mongering –  Bimodal multicast

Valeria Cardellini - SDCC 2017/18

65

Page 34: Comunicazione nei Sistemi Distribuiti

Blind counter rumor mongering

•  Why that name for this gossiping protocol? –  Rumor mongering (def: “the act of spreading rumours”, also

known as gossip): a node with “hot rumor” will periodically infect other nodes

–  Blind: loses interest regardless of the recipient (why) –  Counter: loses interest after F contacts (when)

A node n initiates a broadcast by sending the message m to B of its neighbors, chosen at random. When node p receives a message m from node q If p has received m no more than F times p sends m to B uniformly randomly chosen neighbors that p

knows have not yet seen m. –  Note that p knows if its neighbor r has already seen the

message m only if p has sent it to r previously, or if p received the message from r

Valeria Cardellini - SDCC 2017/18

66

Analysis of blind counter rumor mongering •  Difficult to obtain analytical expressions to describe

the behavior of a gossiping protocol, even for relatively simple topologies simulation analysis

•  Assume Barabási network topology: –  1000 nodes with an average node degree of 6

–  Rumor mongering vs flooding scalability (F=2, B=2)

Source: “The cost of application-level broadcast in a fully decentralized peer-to-peer network”

Valeria Cardellini - SDCC 2017/18

67

Page 35: Comunicazione nei Sistemi Distribuiti

Bimodal multicast

•  Also called pbcast (probabilistic broadcast) •  Composed by two phases:

1.  Message distribution phase: a process sends a multicast with no particular reliability guarantees •  IP multicast if available, otherwise some application-level

multicast (e.g., Scribe trees)

2.  Gossip repair phase: after a process receives a message, it begins to gossip about the message to a set of peers (called fanout) •  Gossip occurs at regular intervals and offers the

processes a chance to compare their states and fill any gaps in the message sequence

Source: K.P. Birman, M. Hayden, O. Ozkasap, Z. Xiao, M. Budiu, and Y. Minsky. Bimodal multicast. ACM Trans. Comput. Syst. 17, 2 (May 1999), 41-88.

Valeria Cardellini - SDCC 2017/18

68

Bimodal multicast: message distribution

• Start by using unreliable multicast to rapidly distribute the message

• But some messages may not get through, and some processes may be faulty

• So initial state involves partial distribution of multicast(s)

Send messages : failed messages

p1

p2

p3

p4

p5

p6 time

Valeria Cardellini - SDCC 2017/18

69

Page 36: Comunicazione nei Sistemi Distribuiti

Bimodal multicast: gossip repair

• Periodically (e.g., every 100 ms) each process sends a digest describing its state to some randomly selected process

• The digest identifies messages: it does not include them

Send digests p1

p2

p3

p4

p5

p6

Valeria Cardellini - SDCC 2017/18

70

Bimodal multicast: gossip repair (2)

• Recipient checks the gossip digest against its own history and solicits a copy of any missing message from the process that sent the gossip

Solicit message copies p1

p2

p3

p4

p5

p6

Valeria Cardellini - SDCC 2017/18

71

Page 37: Comunicazione nei Sistemi Distribuiti

Bimodal multicast: gossip repair (3)

• Processes respond to solicitations received during a round of gossip by retransmitting the requested message

• Various optimizations (not examined)

Send message copies p1

p2

p3

p4

p5

p6

Valeria Cardellini - SDCC 2017/18

72

Why “bimodal”? •  Are there two phases? •  Nope; description of dual “modes” of result

Pbcast bimodal delivery distribution

1.E-30

1.E-25

1.E-20

1.E-15

1.E-10

1.E-05

1.E+00

0 5 10 15 20 25 30 35 40 45 50

number of processes to deliver pbcast

p{#p

roce

sses

=k}

1.  pbcast is almost always delivered to most or to few processes and almost never to some processes Atomicity = almost all or almost none

2.  A second bimodal characteristic is due to delivery latencies, with one distribution of very low latencies (messages that arrive without loss in the first phase) and a second distribution with higher latencies (messages that had to be repaired in the second phase)

Eithersenderfails…

…ordatagetsthroughwith

highprobability

Valeria Cardellini - SDCC 2017/18

73