Big Data - CRS4dassia.crs4.it/wp-content/uploads/2014/11/Dassia_C1_slides_parte_2… · Big Data...

Post on 20-Aug-2020

9 views 0 download

Transcript of Big Data - CRS4dassia.crs4.it/wp-content/uploads/2014/11/Dassia_C1_slides_parte_2… · Big Data...

Big Data

Big Data

… data that exceeds RDBMS capabilities …

Big Data

Origini

Il termine BigData è stato “coniato” da Google intorno al 2004 per descrivere

una quantità di dati talmente grande da non poter essere gestita ed elaborata

con le tradizionali tecnologie, file-system e RBDMS, a sua disposizione

In queste pubblicazioni Google indicò anche la sua soluzione al problema :

● GFS : un file-system distribuito

● MapReduce: un nuovo approccio alla programmazione ed all'esecuzione del codice

Big Data

Origini

GFS è un file-system distribuito progettato da Google per la memorizzazione e la

gestione di file di dimensioni enormi, dell'ordine di svariati Petabyte = 1.000 Tbyte.

MapReduce è un framework che, traendo spunto da un approccio tipico dei linguaggi

funzionali (LISP), propone un modello di programmazione (DataFlow Programming)

che consente di produrre facilmente applicazioni estremamente scalabili.

Ma prima di entrare nei dettagli della soluzione cerchiamo di capire il problema...

Big Data

Caratteristiche e difetti dei sistemi RDBMS

I sistemi RDBMS sono basati su un rigido formalismo matematico che consente di:

evidenziare le caratteristiche essenziali dei dati e

le relazioni che tra essi intercorrono, eliminando tutte le ridondanze.

Processo che prende il nome di normalizzazione e il cui risultato è un modello

astratto dei dati, lo schema che definisce univocamente la struttura e il tipo dei dati.

Per tale ragione i dati cui è stato applicato uno schema vengono definiti strutturati.

Senza nulla togliere ai suoi innumerevoli pregi, il modello relazionale in ambito BigData

presenta due difetti principali :

● rigidità dello schema

● “perdita di informazione” rispetto ai dati RAW

Big Data

Implementazione dello schema

Il processo di normalizzazione porta a suddividere i dati in tabelle, in cui ogni riga

rappresenta un'istanza di un dato.

I sistemi RDBMS sono progettati per consentire un accesso sicuro e un aggiornamento

continuo di ogni singolo record presente sul database.

Questo rende necessario

● l'uso di lock, per garantire che tutte le informazioni interessate da una modifica

siano protette da modifiche concorrenti

● l'uso di chiavi ed indici per individuare rapidamente la posizione dei record di

interesse

● l'uso di JOIN, per consentire la correlazione di informazioni conservate in tabelle

diverse (eventualmente su DB diversi, anche remoti)

Big Data

Conseguenze delle ottimizzazioni

Alcune delle ottimizzazioni introdotte nel passaggio dal modello teorico a quello

concreto dei sistemi RDBMS, hanno delle conseguenze negative rispetto ai BigData:

● lock → serializzazione degli accessi ai dati e limitazione al parallelismo

● indici ed accesso casuale ai record → random read access verso il disco

● JOIN → perdita di località dei dati ( eventuale attesa per il trasferimento degli

stessi da DB remoti)

Big Data

una nuova prospettiva....

La soluzione proposta da Google per risolvere questi problemi è di invertire la logica :

Se le dimensioni dei dati sono molto più grandi di quelle del codice

è più efficiente spostare il codice dove sono i dati da elaborare

In questo modo è possibile garantire:

● la località dei dati (evitando i tempi di attesa necessari al loro trasferimento)

● l'assenza di lock e quindi il massimo parallelismo possibile

● un accesso sequenziale ai dati ed al disco, con velocità prossime al transfer-rate

massimo consentito dall'hardware e indipendenti dal seek-time

● l'assenza di uno schema rigido che consente maggiore flessibilità e riduce il rischio

di “perdita di dati” (schema-on-read)

● un sistema scalabile e fault-tolerant

Big Data

Random Disk Access VS Sequential Disk Access

RDBMS

GFS + MapReduce

Per farci un'idea dell'impatto reale

dei limiti individuati da Google nei

sistemi RDBMS, confrontiamo le

velocità di accesso al disco in

modalità:

● random-read (tipica dei

sistemi RDBMS)

● sequential read (caratteristica

di GFS + MapReduce)

La differenza di velocità è di

circa 3 ordini di grandezza.

Big Data

Tabella Riassuntiva : MapRed VS RDBMS

RDBMS MapReduce

Dimensione dati Gigabytes Petabytes

Tipo di accesso Interattivo e batch batch

Aggiornamenti Frequenti letture e scritture Una scrittura tante letture

Struttura Schema rigido Schema dinamico

Scalabilità Non lineare lineare

Con scalabilità lineare si intende che:

se raddoppio i dati raddoppia il tempo di esecuzione

se raddoppio i dati e le macchine il tempo di esecuzione è costante

MapReduce non è un sostituto dei tradizionali RDBMS ma è complementare ad essi.

Hadoop

here he comes!

Tra il 2003 e il 2004 Google fece alcune pubblicazioni in cui annunciò al pubblico questi

suoi risultati, mantenendo tuttavia chiuso il codice delle proprie implementazioni.

Esplose così la rincorsa alle emulazioni che vide vincitore Hadoop, una

implementazione open-source in Java sviluppata da Dough Cutting, l'autore di Lucene.

Dopo la prima release Yahoo! che lavorava ad un progetto analogo si rese conto che

Hadoop era più efficiente, abbandonò il suo progetto ed ingaggio Dough Cutting per

proseguire lo sviluppo di Hadoop.

Il progetto rimase open-source per attirare la collaborazione di altre aziende e

consentire di competere con il sistema sviluppato da Google.

Hadoop

tratti distintivi

I principali di Hadoop riflettono, ovviamente, le sue origini e si rifanno alle soluzioni proposte da Google :

Accessibilità: è in grado di funzionare sia su grandi cluster di macchine a basso costo (COTS), sia su infrastrutture cloud come Amazon EC2;

Robustezza: ha funzionalità innate di fault-tolerance; essendo stato pensato, sin dal principio, per gestire eventuali malfunzionamenti dell'hardware (tipici dei sitemi COTS) e quindi ;

Scalabilità: è in grado di scalare linearmente con l'aumentare dei dati da gestire; raddoppiando il numero dei nodi è in grado di processare il doppio dei dati nello stesso tempo;

Semplicità: consente di scrivere facilmente e in maniera affidabile codice che viene automaticamente distribuito ed eseguito su un cluster (indipendentemente dal numero di macchine, da un minimo di 1 fino ad N )

Hadoop

Componenti Fondamentali

Il framework Hadoop si compone di tre elementi:

• Commons : un insieme di utility su cui si basano gli altri componenti

• HDFS : un file-system distribuito utilizzato per la memorizzazione dei dati

• Map-Reduce: il framework vero e proprio per il supporto alla programmazione distribuita

Hadoop

HDFS

HDFS (acronimo di Hadoop Distributed File System ) è un file system

distribuito progettato sul modello del Google File System ed orientato alla

gestione file di grandi dimensioni, dell'ordine dei petabyte

1PB = 1.000 Terabyte = 1.000.000 Gigabyte

HDFS

Un file-system block oriented

E' un file-system minimale, ispirato al Google FS, sviluppato per supportare Hadoop.

Dunque con pochi semplici obiettivi da realizzare:

● possibilità di gestire file di dimensione superiore al petabyte

● alta velocità in lettura/scrittura (prossima al max-transfer rate del disco)

● fault-tolerance

● ridondanza dei dati

Per far questo ogni file è rappresentato come

un insieme non contiguo di blocchi

e a differenza di un file-system tradizionale ciascun blocco non contiene informazioni

relative al file, ai permessi, ai blocchi adiacenti o altri tipi di metadati, ma solo dati RAW.

HDFS

Un file-system block oriented

Un'altra differenza fondamentale, per ridurre l'impatto del seek-time sull'accesso al

disco, è la dimensione dei blocchi.

In un file-system tradizionale la dimensioni tipiche dei blocchi variano tra 512 bytes e

4Kbytes, su HDFS le dimensioni tipiche variano tra 64Mbyte e 256Mbyte.

Inoltre ogni singolo blocco può essere replicato su più macchine diverse, garantendo

così un semplice ma efficace meccanismo di ridondanza che aumenta la sicurezza del

sistema.

HDFS

architettura interna

HDFS è caratterizzato da un'architettura interna di tipo master-slave:

● il master, detto NameNode, ha il compito di conservare e gestire la struttura del

file-system e in generale tutto il sistema di metadati:

● suddivisione in cartelle (e loro gestione)

● posizionamento logico dei file nelle cartelle

● posizionamento fisico dei file sulle macchine (suddivisione in blocchi e

posizionamento di ogni replica di ogni singolo blocco)

● regole di accesso (molto semplificate rispetto a Posix)

● gli slave, detti DataNode, hanno invece il solo compito di memorizzare i blocchi e

consentirne l'accesso in lettura/scrittura alla massima velocità possibile

HDFS

Architettura interna

Hadoop

MapReduce

MapReduce si rifà ad un modello di programmazione tipico dei linguaggi funzionali, ed in particolare del Lisp (ideato nel 1958 da John McCarthy).

In Lisp esistono infatti due funzioni, mapcar e reduce, che si occupano di : mapcar : applicare una generica funzione ad ogni elemento di un insieme; reduce : applicare un operatore di aggregazione agli elementi di un insieme per combinarne i valori ( restituendo solitamente un insieme con un numero ridotto di elementi)

MapReduce

Architettura del Framework

Master

Slave

Slave

Slave

Job Tracker

Task Tracker

Task Tracker

Task Tracker

MapReduce

Architettura del Framework

Come HDFS, il framework MapReduce è costituito da più tool organizzati secondo

un'architettura master-slave:

● il nodo master, o JobTracker, ha il compito di gestire il ciclo di vita dei job :● permanenza in coda

● interazione con il NameNode

● suddivisione in task

● scheduling del job

● riesecuzione di eventuali task falliti

● notifica sul progresso/fine del job

MapReduce

Architettura del Framework

● i nodi slave, o TaskTracker, hanno il compito di gestire l'esecuzione dei singoli task :

● configurazione

● avvio e gestione di un numero limitato di tentativi

● notifica del progredire dell'elaborazione

● notifica del completamento / fallimento del task

MapReduce

MapCar Operation

Facciamo un semplice esempio per capire meglio il funzionamento di queste funzioni.

Date due liste di numeri e vogliamo calcolare la somma dei rispettivi elementi.

X 6 2 4 3 2 4 2 ...

Y 3 8 1 3 7 5 1 ...

X + Y 9 10 5 6 9 9 3 ...

MapReduce

... in stile procedurale

Con i linguaggi di programmazione procedurali diciamo al compilatore cosa deve fare.

Tipicamente per un problema di questo tipo si definisce un ciclo e si indica al compilatore ogni singola operazione da eseguire ad ogni ripetizione

● inizializza un contatore

● prendi la prima coppia di numeri● calcola la somma● scrivi il risultato nella nuova lista

● incrementa il contatore● prendi la seconda coppia di numeri● calcola la somma● scrivi il risultato nella nuova lista

● …● …

38

62

13

43

75

24

9105699

MapReduce

...in stile funzionale: es. mapcar in Lisp

Tipicamente in un problema di questo

tipo si dice al compilatore:

il valore a destra è ottenuto sommando

i due valori a sinistra.

In questo modo il compilatore può

scegliere la strategia con cui realizzare

le operazioni. Ad esempio in parallelo

se ha più processori a disposizione.

3

8

6

2

1

3

4

3

7

5

2

4

9

10

5

6

9

9

Con i linguaggi di programmazione funzionali diciamo al compilatore come deve calcolare determinati risultati ma lasciamo a lui il compito di decidere come farlo.

MapReduce

Reduce

L'operazione di Reduce combina gli elementi di un insieme e, in generale, restituisce un

insieme con un numero di elementi minore.

Ad esempio potremmo utilizzare Reduce per sapere quanti elementi erano dispari e

quanti pari, oppure quanti risultati erano minori di 8:

Reduce(' count(pari/dispari) ', X + Y )

2 5

X + Y 9 10 5 6 9 9 3 ...

Reduce ( ' count(<8 )', X + Y ) 3 4

MapReduce

Job

Come il nome del framework suggerisce, l'unità elementare di programmazione in

Hadoop, denominata JOB, è data dalla combinazione di un operazione di map e di

un'operazione di reduce.

In alcuni casi è possibile/conveniente omettere l'operazione di Reduce.

In tal caso i Job sono detti map-only.

Non è invece possibile omettere l'operazione di Map, anche se non altera in alcun

modo i dati, una funzione di map va sempre specificata.

Quindi un Job hadoop realizza un parallelismo di tipo SIMD (Single Instruction Multiple

Data) in quanto applica in parallelo una stessa trasformazione su tutti i blocchi che

costituiscono il file di input.

MapReduce

map + reduce

(map ' ( ))

( )

(reduce ' )( )

Input

( map <op> ' ( k-v , k-v, k-v ) )

( k-v , k-v, k-v )

( reduce <op> ' ( k-v , k-v, k-v ) )

Result

MapReduce

Ciclo di vita di un Job

MapReduce

Word Count

MapReduce

Definizione formale di un Job

Un job è costituito da un'operazione di map ed una di reduce.

Formalmente queste si definiscono nel seguente modo:

map : (K1, V1) → list(K2, V2)

reduce: (K2, list(V2)) → list(K3, V3)

dove (K,V) indica una generica coppia chiave valore.

Grazie a questa definizione formale e sfruttando i generics di Java il Framework

Hadoop è in grado di gestire ed eseguire qualunque Job indipendentemente dal tipo di

dati reale su cui opererà.

Definizione formale Job

Vincoli sui tipi dei dati

L'analisi del ciclo di vita di un Job ha illustrato una sequenza di operazioni che vengono

effettuate sui dati prima, durante e dopo l'esecuzione delle operazioni di map e reduce.

1. i dati vengono letti dalla sorgente (solitamente un file HDFS) e passati al map

2. i dati emessi dal map vengono ordinati e raggruppati e trasferiti al reduce

3. i dati emessi dal reduce vengono salvati sulla destinazione (solitamente un file

HDFS)

Queste operazioni pongono alcuni vincoli (relativamente leggeri) sui tipi di dati su cui le

operazioni di map e reduce possono operare.

Definizione formale Job

Vincoli sui tipi dei dati

I vincoli imposti sui dati sono:

● deve esistere un meccanismo standardizzato per la lettura/scrittura dei dati

● deve esistere un meccanismo standardizzato per il confronto di due dati in modo

da consentirne l'ordinamento

In termini di linguaggio Java questo si traduce nell'implementazione di due interfacce

● Writable - void write(DataOutput d)

void readFields(DataInput di)

● Comparable - int compareTo(T o)

Che per semplicità vengono raggruppate nell'interfaccia WritableComparable.

Definizione formale Job

Tipi di dati nativi

Il framework fornisce dei tipi che rappresentano i tipi nativi di Java e che implementano

queste interfacce:

● int → IntWritable

● long → LongWritable

● …

● String → Text

Qualora si dovesse aver necessità di tipi personalizzati sarà necessario implementare

queste interfacce per renderli compatibili con il framework.

MapReduce

Dai dati RAW alle coppie Chiave-Valore

Logical

HDFS

InputFormat

Job

Input Format

Input Format

Da RAW a WritableComparable

Un InputFormat svolge due compiti fondamentali

● suddividere i dati di input in un insieme di split (ciascuno dei quali contiene un

certo numero di record)

● fornire un RecordReader che legga i dati RAW presenti in uno split e restituisca

degli oggetti di tipo WritableComparable

Non bisogna confondere gli split con i blocchi HDFS.

Gli split rappresentano un gruppo di record, ovvero una divisione logica dell'input,

mentre i blocchi rappresentano una suddivisione fisica tipica di un particolare formato di

memorizzazione.

Uno split può indicare anche un gruppo di righe in una tabella di un DB.

Input Format

Principali input format forniti dal framework

I principali InputFormat forniti dal framework sono:

● FileInputFormat – rappresenta un file su HDFS ed è in grado di gestire

correttamente la presenza di split a cavallo tra due blocchi.

● TextInputFormat – discende da FileInputFormat e fornisce un RecordReader

che restituisce i dati in formato (LongWritable, Text)

● key [LongWritable] = distanza in byte dall'inizio del file

● value [Text] = una riga di testo

● KeyValueTextInputFormat - discende da FileInputFormat; usato per file testuali

in cui ogni riga rappresenta una coppia chiave valore delimitata da un separatore;

restituisce i dati in formato (Text, Text)

● key [Text] = una stringa rappresentante la chiave

● value [Text] = una stringa rappresentante il valore

Output Format

Da WritableComparable a RAW

Gli OutputFormat sono organizzati in maniera duale agli InputFormat ed eseguono

l'operazione inversa.

Una nota va spesa sul TextOutputFormat che converte i dati in stringhe, chiamando il

metodo toString() e li salva su file.

Nel farlo tuttavia memorizza in ogni riga una coppia chiave valore, per cui il

corrispondente InputFormat è il KeyValueTextInputFormat e non il

TextInputFormat.

MapReduce

Da RAW a Result

MapReduce

Word Count

MapReduce

Word Count : Pseudocodice

InputFormat = KeyValueTextInputFormat

Map input: key = linea di testo value = “”

map(String key, String value): for each word w in key: EmitIntermediate(w, "1");

[..Framework .. Shuffle & Sort ]

Reduce input: key = word values = lista di occorrenze es: {1, 1, 1, ….. }

reduce(String key, Iterator values): int result = 0; for each v in values: result = result + v;

Emit( key, result);

MapReduce

Algoritmi Complessi

In generale non è possibile tradurre un algoritmo complesso in un singolo Job e

quest'ultimo va visto come una singola unità di trasformazione dei dati di ingresso.

L'algoritmo complessivo sarà dunque costituito da un grafo orientato (DAG: Direct

Acyclic Graph) in cui ogni nodo rappresenta un singolo Job.

Input Output

MapReduce

Driver

L'esecuzione di un algoritmo complesso, costituito da un DAG di Job, consente di

realizzare un parallelismo più ampio, indicato con il termine MIMD (Multiple

Instructions Multiple Data ) in quanto applica più operazioni contemporaneamente su

più dati.

Spesso per eseguire un algoritmo complesso e controllare l'intera esecuzione del DAG

viene scritto un tool apposito, che viene denominato Driver per sottolineare il fatto che

pilota l'intera pipeline.

Questo modello di programmazione viene denominato DataFlow Programming, in

quanto descrive il percorso che i dati seguono nell'attraversare le varie operazioni o job.

Interoperabilità

Hadoop ed altri linguaggi di programmazione

Hadoop è scritto in Java ed è quindi naturale l'interazione più diretta avvenga attraverso

questo linguaggio. Tuttavia Hadoop offre due meccanismi che consentono la scrittura di

Job Hadoop in molti altri linguaggi di programmazione.

Questi due meccanismi sono

● Hadoop Streaming – che fa uso degli stream per il passaggio delle coppie chiave

valore

● Hadoop Pipes – che usa dei socket per il passaggio delle coppie chiave valore

Interoperabilità

Hadoop Streaming

Hadoop Streaming uso gli stream di Unix (stdin e stdout) come interfaccia tra il

framework e il Job, che dunque può esser scritto in un qualunque linguaggio che sia in

grado di leggere dallo standard input e di scrivere sullo standard output.

Hadoop Streaming ha una “visione” text-oriented dei dati, per cui

● il Task converte i dati di input in righe di testo e li passa al map attraverso lo

standard input

● il map li elabora e scrive i risultati sullo standard output

● questi vengono poi raccolti e ordinati (come testo)

● un altro Task suddivide i dati provenienti dai vari map in righe di testo e le inoltra al

reduce attraverso lo standard input

● il reduce le elabora e scrive i risultati sullo standard output

● questi vengono raccolti e memorizzati su file

Interoperabilità

Hadoop Pipes

Hadoop Pipes funziona in maniera simile a Streaming ma utilizza dei socket per il

passaggio dei dati ed è principalmente orientata agli sviluppatori C++.

Sia la chiave che il valore vengono forniti come stringhe ( std::string ).

Un Job in chiaveC++ deve sempre fornire due classi Mapper e Reducer ma i metodi

map() e reduce() hanno un solo parametro d'ingresso MapContext e ReduceContext

i cui metodi principali sono

const std::string& getInputKey();

const std::string& getInputValue();

void emit(const std::string& key, const std::string& value);

il ReduceContext ha il metodo addizionale bool nextValue(); per passare al valore

successivo.

Interoperabilità

Python + Hadoop = Pydoop

A partire dal 2009 il CRS4 ha sviluppato Pydoop[1], una API per consentire la scrittura

di Job MapReduce e l'interazione con HDFS anche a chi utilizza il linguaggio Python.

È possibile scaricarlo da:

http://pydoop.sourceforge.net/docs/

Ed è compatibile con le maggiori distribuzioni di Hadoop:

Apache Hadoop 0.20.2, 1.0.4, 1.1.2, 1.2.1 or 2.2.0

CDH 3u{4,5} or 4.{2,3,4,5}.0

[1] S. Leo and G. Zanetti. Pydoop: a Python MapReduce and HDFS API for Hadoop. In Proceedings of the 19th ACM

International Symposium on High Performance Distributed Computing, 819-825, 2010.

Pydoop

Architettura del sistema

Pydoop utilizza Hadoop Pipes e le librerie Boost per realizzare la comunicazione tra i codice Python ed il framework Map Reduce o l'HDFS

Pydoop

Python API

Attraverso le API di Pydoop è possibile avere accesso diretto dal codice Python alla

gran parte delle funzionalità offerte dal framework Hadoop:

pydoop.pipes — MapReduce API

pydoop.hdfs — HDFS API

pydoop.hdfs.path — Path Name Manipulations

pydoop.hdfs.fs — File System Handles

pydoop.hdfs.file — HDFS File Objects

pydoop.utils — Utility Functions

pydoop.jc — Pydoop Script Configuration Access

pydoop.hadut – Run Hadoop Shell Commands

http://pydoop.sourceforge.net/docs/api_docs/index.html

Pydoop

pydoop script

Tuttavia l'accesso diretto alle API Pydoop richiede una profonda conoscenza del sistema

e delle tecnologie utilizzate per integrarlo con Hadoop.

Per semplificare la scrittura di job è stato creato Pydoop Script, che consente di scrivere

un Job Hadoop semplicemente definendo le due funzioni mappre e reducer.

A questo punto per lanciare il Job sarà sufficiente digitare sulla shell il comando :

pydoop script myJob.py hdfs_input hdfs_output

Pydoop Script lancia il Job utilizzando una configurazione di default.

E' possibile modificare la configurazione specificando sulla riga di comando i parametri

nel formato chiave=valore secondo lo standard tipico di Java (opzione -D

property_name=property_value ).

Pydoop

WordCount

Utilizzando Pydoop script, possiamo realizzare il WordCount in Python, un unico script

wordcount.py in cui definiamo le funzioni mapper e reducer

def mapper(key, text, writer):

for word in text.split():

writer.emit(word, "1")

def reducer(word, icounts, writer):

writer.emit(word, sum(map(int, icounts)))

Per eseguire lo script è sufficiente digitare sul terminale

pydoop script wordcount.py INPUT_PATH OUTPUT_PATH

dove INPUT_PATH e OUTPUT_PATH sono dei path all'interno dell'HDFS