MapReduce: teoria e implementazione -...

Post on 21-Jul-2020

2 views 0 download

Transcript of MapReduce: teoria e implementazione -...

MapReduce: teoria e implementazione

1

Genesi

•  nasce nel 2004 ❒  [“MapReduce: simplified Data Processing on

Large Clusters”, Dean, Ghemawat. Google Inc.] •  nasce dall’esigenza di eseguire problemi

“semplici” su big data (>1TB) - e.g., ❒  grep ❒  conteggio delle frequenze di accesso a url ❒  inverted index

2

Fino al 2004 •  Large input data => soluzione di Google:

<<abbiamo tante risorse computazionali. Usiamole! Distribuiamo la computazione!>> ❒  Riformulare il programma per calcolo parallelo ❒  distribuire dati ❒  gestire guasti, etc.

•  => il problema non è più semplice! •  Per di più: occorre riformulare ogni singolo

problema (grep, inverted index, etc.) 3

Motivazioni in breve

① Voglio processare grandi moli di dati ② Voglio usare molte (centinaia/

migliaia) di CPU ③ Voglio che tutto questo sia

SEMPLICE

4

Nel 2004 •  Dean e Ghemawat propongono MapReduce, un nuovo

modello di programmazione distribuita. •  Risolve problemi di analisi di grandi insiemi di dati

(stesso insieme di istruzioni su tanti record diversi). •  Si chiede al programmatore di convertire il suo

codice in due funzioni: ❒  map function ❒  reduce function

I programmi scritti secondo questo modello sono intrinsecamente parallelizzabili

5

Map & Reduce

value map key value key value key value key value key value key value key value key

…...

(key1, val)

(key8, val)

(key1, val)

(key8, val)

(key5, val)

(key1, val)

map

map

reduce

reduce

reduce

…... …...

value key value key value key value key value key value key value key value key

input data set intermediate

pairs output

data set sh

uffle

6

Come può esserci utile?

•  Un esempio classico: WordCount

vorrei contare le occorrenze di ogni parola all’interno di una collezione di documenti

7

Map & Reduce

map

most poetry ignores most people

(most,1) (people,1) (ignore,1)

map

reduce

reduce

1 ignore 1 ignores

4most 2people 2poetry

input data set intermediate

pairs output

data set

most people ignore most poetry

(most,1) (poetry,1)

(most,1) (poetry,1) (ignores,1) (most,1) (people,1)

shuf

fle

phas

e

doc1

.txt

do

c2.t

xt

8

pseudo-codice //key:document name. value:document contents !map(String key, String value){!

!for each word w in value!! !EmitIntermediate(w, "1”)!

}!// key: a word. values: a list of counts!reduce(String key, Iterator values){!

!int result = 0;! !for each v in values{! !result += ParseInt(v);! ! !Emit(AsString(result));!

!}!}!!

9

Cosa c’è di bello? •  Abbiamo scomposto il conteggio delle parole in

sotto-programmi che possono essere associate a diversi task. ❒  ogni map task lavora su un sottoinsieme dell’input ❒  ogni reduce task lavora su una chiave (merge) ❒  i reduce usano l’out dei map => R dopo M

•  A parte questo, l’esecuzione può procedere in parallelo!

I programmi scritti secondo il modello MapReduce sono intrinsecamente parallelizzabili

10

Parallizzazione: map phase

macchina 1

macchina 2 macchina 3

macchina 4

doc1.txt! doc2.txt!

doc3.txt!

M M

M coordinatore

11

Parallizzazione: shuffle phase

macchina 1

macchina 2 macchina 3

macchina 4

doc1.txt! doc2.txt!

doc3.txt!

M

M M

coordinatore

R

R

12

Parallizzazione: reduce phase

macchina 1

macchina 2 macchina 3

macchina 4

doc1.txt! doc2.txt!

doc3.txt!

R

coordinatore

R most,4!people,2!ignore,1!

poetry,2!ignores,1!

13

Cos’altro si può fare?

Distributed grep: voglio filtrare le linee di un documento (molto grande) in cui appare una parola X. 1.  Il documento è spezzettato in chunk. 2.  map: filtra le linee del suo chunk che

contengono X 3.  reduce: funzione identità

14

Cos’altro si può fare?

Frequenza di accesso a un URL: dato un log con l’elenco di URL visitati, voglio contare le frequenze di accesso per ogni URL. 1.  Il documento è spezzettato in chunk. 2.  map: analizza il suo chunk e per ogni URL

emette una coppia <URL, 1>!3.  reduce: somma tutti i valori associati allo

stesso URL ed emette una coppia <URL,totalcount>!

15

Cos’altro si può fare? Inverted index: dato un elenco di documenti voglio per ogni parola l’elenco ordinato dei documenti che la contengono. 1.  Ogni documento è affidato ad un map task (o

splittato tra più map task). 2.  map: per ogni parola w nel documento docY

emette <w, docY>!3.  reduce: prende tutte le coppie con chiave w, fa

il sort dei documentID ed emette una coppia <w,list(docID)>!

16

Cos’altro si può fare? Distributed sort: dato un elenco di record, ordinarli secondo un certo criterio. 1.  Il documento è spezzettato in chunk 2.  map: per ogni record estrae la chiave ed

emette delle coppie <key, value>!3.  reduce: funzione identità

il sort funziona grazie ad un comportamento di default dei reduce, la garanzia di ordinamento, che vediamo più avanti...

17

Cos’altro si può fare? •  Reverse Web-link Graph[*] •  Term-Vector per Host[*] •  Iterative MapReduce •  ecc…

[*]“MapReduce: simplified Data Processing on Large Clusters”, Dean, Ghemawat. Google Inc. 6° Symposium on Operating Systems Design & Implementation (OSDI2004)

18

Nella pratica Esistono diversi tool per eseguire un programma MapReduce su architettura distribuita •  Google MapReduce •  Apache Hadoop •  Apache Spark •  Azure Twister

Permettono a programmatori senza alcuna esperienza di sistemi distribuiti di utilizzare facilmente le risorse di un data center per elaborare grandi moli di dati.

19

Apache Hadoop

•  open source •  struttura e interfaccia relativamente

semplici •  fatto di due sotto-componenti:

MapReduce Runtime (coordinatore) Hadoop Distributed File System (HDFS)

•  vediamo com’è fatto Hadoop v1.x.x

20

Namenode

HDFS file system distribuito che viene gestito da dei demoni con un’architettura master/slave

Datanode Datanode

macchina 1 (master)

macchina 2 (slave) macchina 3 (slave)

il contenuto del file system NON è (in generale) replicato sui vari nodi, ma spezzettato tra i vari nodi

21

copio un file dal file system locale in HDFS

Hadoop si occupa dello split in modo trasparente all’utente.

Namenode

Datanode Datanode

file.txt!

:~$ hadoop dfs !–copyFromLocal !/path/to/file.txt !/path/in/hdfs/file.txt

blk001! blk002!

macchina 1 (master)

macchina 2 (slave) macchina 3 (slave)

file.txt!

22

Verifico il contenuto di HDFS

•  Il file mi viene mostrato come se fosse tutto intero su un unico supporto

•  i comandi sono shell-like

hadoop@hadoop1:~$ hadoop dfs –ls!Found x items !... !drwxr-xr-x - hadoop root 2010-03-16 11:36 /user/hadoop/file.txt!... !

hadoop@hadoop1:~$ hadoop dfs –mkdir mydir!hadoop@hadoop1:~$ hadoop dfs –rmr mydir!!

23

HDFS Può gestire autonomamente anche la replicazione dei dati. Parametro di configurazione: dfs.replication=2 Utile in caso di failure o per velocizzare i calcoli

Namenode

Datanode Datanode

file.txt!

002!

macchina 1 (master)

macchina 2 (slave) macchina 3 (slave)

file.txt!

001!002! 001!

24

Lancio un job wordcount (Hadoop v1.x.x.)

:~$ hadoop jar wordcount.jar WordCount indir outdir!

comando di lancio

contiene i .class

nome della classe che contiene il main

cartelle di input e

output nel dfs

Cosa succede in Hadoop quando lancio questo comando? Viene chiamato il componente MapReduce runtime.

25

MapReduce runtime (v1.x.x.) Componente di Hadoop che si occupa di generare e coordinare i map/reduce task. Architettura master/slave

JobTracker

TaskTraker

TaskTraker

Il jobtraker assegna il lavoro da fare (map/reduce tasks) ai tasktraker cercando di garantire il più possibile la località dei dati

macchina 1 (master)

macchina 2 (slave) macchina 3 (slave)

26

MapReduce runtime: data locality JobTracker Namenode

TaskTraker Datanode

TaskTraker Datanode

macchina 1 (master)

macchina 2 (slave) macchina 3 (slave)

blk001! blk002!

map blk001 map blk002 reduce key1to5 reduce key6to9

•  Il jobtraker chiede al demone namenode: dov’è blk001?

•  Il namenode risponde: su slave2

•  jobtracker chiede al tasktracker 2: posso assegnarti un map task?

•  tasktracker 2: sì •  jobtracker alloca

map di blk001 a slave2

27

Manca qualcosa…

value map key value key value key value key value key value key value key value key

…...

(key1, val)

(key8, val)

(key1, val)

(key8, val)

(key5, val)

(key1, val)

map

map

reduce

reduce

reduce

…... …...

value key value key value key value key value key value key value key value key

shuf

fle

28

Manca qualcosa…

value map key value key value key value key value key value key value key value key

…...

(key1, val)

(key8, val)

(key1, val)

(key8, val)

(key5, val)

(key1, val)

map

map

reduce

reduce

reduce

…... …...

value key value key value key value key value key value key value key value key Come scelgo

quanti mapper e quanti reducer servono?

Come scelgo quanti map e reduce task servono?

29

Manca qualcosa…

value map key value key value key value key value key value key value key value key

…...

(key1, val)

(key8, val)

(key1, val)

(key8, val)

(key5, val)

(key1, val)

map

map

reduce

reduce

reduce

…... …...

value key value key value key value key value key value key value key value key

shuf

fle

Come scelgo quali chiavi vanno a un reduce task e quali all’altro?

30

Quanti M e R task? Vorremmo il maggior grado di parallelismo possibile => la risposta dipende da: 1.  quale grado di parallelismo ho nella mia

architettura? Quanti core? con hyperthreading (capacità di eseguire 2 thread contemporaneamente sullo stesso core)?

2.  quanti dati devo processare?

31

Quanti M e R task? Hadoop permette di specificare il grado di parallelismo di ogni macchina in configurazione esprimendo il numero di slot. uno slot è un contenitore in cui può finire un map/reduce task in esecuzione su una macchina dual-core con hyperthreding specificherò: •  n° map slots = 4 •  n° reduce slot = 4

poiché la fase di map e la fase di reduce non si sovrappongono (quasi) mai

core1 core2

32

Quanti map task lancio? Supponiamo di voler fare wordcount su un file da 1GB e di aver a 4 slave dual-core senza hyperthreading 8core totali => 8 slot Soluzione di default di Hadoop: •  ogni map processa dei chunk di massimo 64MB •  il file viene spezzato in blocchi di 64MB •  Hadoop lancerà 1GB/64MB = 16 map task tutti insieme? NO, perché ho solo 8 slot!

M M

M M M M M M M M

M M M M M M 33

Quanti map task lancio? è la soluzione migliore? forse no… 1GB/8core = 128MB Se avessi avuto dei chunk da 128MB, avrei fatto tutto in parallelo con soli 8 task e meno cambi di contesto. Hadoop permette di cambiare il valore del chunck di default in configurazione, ma •  occorrono nozioni di computazione distribuita •  effettivi miglioramenti si notano solo con molti TB ⇒  solitamente hadoop decide da solo il numero di map numero di map task = numero di chunck da 64MB in input

M M M M M M M M

34

Quanti reduce task lancio? Questo lo decide l’utente! job.setNumReduceTasks(n);!Gli sviluppatori di hadoop consigliano due valori ottenuti statisticamente: 0.95 * n° tot di reduce slot!se i core sono tutti uguali oppure 1,75 * n° tot di reduce slot!se c’è qualche differenza di velocità tra i core delle varie macchine

R R R R

R R R R

R R R

35

Manca qualcosa…

value map key value key value key value key value key value key value key value key

…...

(key1, val)

(key8, val)

(key1, val)

(key8, val)

(key5, val)

(key1, val)

map

map

reduce

reduce

reduce

…... …...

value key value key value key value key value key value key value key value key

shuf

fle

Come scelgo quali chiavi vanno a un reduce task e quali all’altro?

36

Partitioning/Shuffle phase

map

most poetry ignores most people

(most,1) (people,1) (ignore,1)

map

reduce

reduce

1 ignore 1 ignores

4most 2people 2poetry

most people ignore most poetry

(most,1) (poetry,1)

(most,1) (poetry,1) (ignores,1) (most,1) (people,1)

doc1

.txt

do

c2.t

xt

Come scelgo quali chiavi vanno a un reducer e quali all’altro? ma sopratutto, come faccio a farlo velocemente?

37

Naive partitioning La prima cosa che viene in mente è: •  ordino la lista delle coppie intermedie per chiave:

[ignore,ignores,most,people,poetry] •  divido la lista per il numero di reduce (bilancio il carico):

R=2 => R1=[ignore,ignores] R2=[most,people,poetry] Problemi: •  l’ordinamento è sempre oneroso O(n log2 n) •  posso decidere a che reducer mandare una coppia solo

dopo che è stato definito tutto l’elenco delle chiavi. Infatti se l’elenco fosse stato:

[ignore,ignores,most,people,pippo,poetry] R1=[ignore,ignores,most] R2=[people,pippo,poetry]

38

Hadoop default partitioning Il suggerimento di Dean e Ghemawat è di usare una hash function. Per cui Hadoop implementa questo partitioner di default: public class HashPartitioner<K, V> !! ! ! ! !extends Partitioner<K, V> {!

public int getPartition(K key, V value, !! ! ! ! ! !int numReduceTasks) {!

return (key.hashCode() & Integer.MAX_VALUE) ! ! ! ! ! !% numReduceTasks;!

}!}!

bitwise AND per avere solo valori positivi

modulo per avere sempre un risultato nell’intervallo [0,numReduceTasks-1]

39

Hadoop default partitioning Risultato: getPartition(“ignore”,”1”,2) = 0 getPartition(“ignores”,”1”,2) = 1 getPartition(“most”,”1”,2) = 1 getPartition(“pippo”,”1”,2) = 0 getPartition(“people”,”1”,2) = 1 getPartition(“poetry”,”1”,2) = 1

•  nessun ordinamento •  le coppie possono essere

inviate ai R man mano che vengono prodotte dai M, senza attendere che si sia definita tutta la lista.

40

Hadoop default partitioning

macchina 1

macchina 2 macchina 3

macchina 4

doc1.txt! doc2.txt!

doc3.txt!

R

coordinatore

R ignore,1!pippo,1!

ignores,1!most,4!people,2!poetry,2!

sui grandi numeri questo garantisce anche un certo bilanciamento del carico

NB: garanzia di ordinamento. Il reducer emette sempre i risultati ordinati per chiave

41

Cos’altro si può fare? Distributed sort: dato un elenco di record, ordinarli secondo un certo criterio. 1.  Il documento è spezzettato in chunk 2.  map: per ogni record estrae la chiave ed

emette delle coppie <key, value>!3.  reduce: funzione identità

il sort funziona grazie alla garanzia di ordinamento. Tutte le coppie arrivano ad UN SOLO reduce task, che le trasmette immutate sull’output, ma in ordine.

42

Combiner function In certi casi può essere vantaggioso far fare qualcosa di più ai map task anticipando il lavoro dei reducer.

Es: wordcount classico ❒  map emette per ogni w nel chunk , <w,1> ❒  reduce emette <w,sum(values)> !

Così ho •  un sacco di traffico tra M e R: una coppia <w,1>

per ogni w nel documento. •  Il reducer deve fare tutte le somme

43

Combiner function Dean e Ghemawat osservano che: Se la funzione del reduce è associativa e commutativa, posso parzialmente anticiparla, facendola eseguire sulla stessa macchina del map task.

Es: wordcount ❒  map emette per ogni w nel chunk , <w,1> ❒  combiner (sulla stessa macchina del map): per ogni <w,1> fa la somma ed emette <w,S > = <w,sum(ones)> ❒  reduce emette <w,sum(S)>!

44

Fault tolerance MapReduce si presta a implementare meccanismi di tolleranza ai guasti, INDIPENDENTI dal problema (grep, sort, wordcount,etc.) •  Se il nodo Master si accorge di errori su uno slave,

riesegue Map e Reduce di quello slave da un’altra parte. •  Se il nodo Master si accorge di errori sui blocchi, li

salta e riesegue Può non esser sufficiente per gli scopi del programmatore, ma c’è in tutte le implementazioni MapReduce.

45

M

Elasticità Hadoop implementa anche meccanismi per scalare l’architettura a runtime.

JobTracker Namenode

TaskTraker Datanode

TaskTraker Datanode

macchina 1 (master)

macchina 2 (slave) macchina 3 (slave)

blk001! blk002! blk003!

M M

46

TaskTraker Datanode

M

Elasticità Hadoop implementa anche meccanismi per scalare l’architettura a runtime.

JobTracker Namenode

TaskTraker Datanode

TaskTraker Datanode

macchina 1 (master)

macchina 2 (slave) macchina 3 (slave)

blk001! blk002!

M M

macchina 4 (slave)

blk003!

47

Elasticità Particolarmente utile se la mia infrastruttura sottostante mi consente di aggiungere/togliere risorse facilmente…

=> il cloud è il supporto migliore consente elasticità nel provisioning e deprovisioning di nodi per la computazione (virtual machines)

48

In laboratorio ogni gruppo avrà a disposizione un cluster di 3 macchine virtuali configurate per eseguire dei job hadoop. •  1 master/slave

(jobtracker, namenode, tasktracker, datanode) •  2 slave

(tasktracker, datanode)

Vedremo •  Creazione •  Compilazione •  Esecuzione di un programma MapReduce

49