RDD mitjançant Spark: el bloc bàsic d’Apache Spark



Aquest bloc sobre RDD que utilitza Spark us proporcionarà un coneixement detallat i exhaustiu de RDD, que és la unitat fonamental de Spark i de la utilitat que té.

, La paraula mateixa és suficient per generar una espurna a la ment de tots els enginyers de Hadoop. A n a la memòria eina de processament que és fulminant en informàtica de clústers. En comparació amb MapReduce, l’ús compartit de dades a la memòria crea RDD 10-100x més ràpid que la compartició de xarxes i discos, i tot això és possible a causa dels RDD (conjunts de dades distribuïts resilients). Els punts clau que ens centrem avui en aquest RDD mitjançant l'article Spark són:

Necessiteu RDD?

Per què necessitem RDD? -RDD amb Spark





El món evoluciona amb i Ciències de les dades a causa de l 'avanç en . Algorismes basat en Regressió , , i que funciona Distribuït Computació iterativa ció moda que inclou la reutilització i l’ús compartit de dades entre diverses unitats informàtiques.

El tradicional les tècniques necessitaven un emmagatzematge intermedi i distribuït estable, com ara HDFS que inclouen càlculs repetitius amb rèpliques de dades i serialització de dades, cosa que va fer que el procés fos molt més lent. Trobar una solució mai va ser fàcil.



què fa .trim a Java

Aquí és on RDD (Conjunts de dades distribuïts elàstics) arriba al panorama general.

RDD S són fàcils d’utilitzar i es creen sense esforç ja que les dades s’importen de fonts de dades i s’emmagatzemen als RDD. A més, s'apliquen les operacions per processar-les. Són un col·lecció distribuïda de memòria amb permisos com Llegeix només i el més important, ho són Tolerant a fallades .



Si n’hi ha partició de dades de el RDD és perdut , es pot regenerar aplicant el mateix transformació operació en aquesta partició perduda a llinatge , en lloc de processar totes les dades des de zero. Aquest tipus d’enfocament en escenaris en temps real pot produir miracles en situacions de pèrdua de dades o quan un sistema no funciona.

Què són els RDD?

RDD o ( Conjunt de dades distribuïdes resistents ) és un element fonamental estructura de dades a Spark. El terme Resistent defineix la capacitat que genera les dades automàticament o dades enrere fins al estat original quan es produeix una calamitat inesperada amb probabilitat de pèrdua de dades.

Les dades escrites als RDD són particionat i emmagatzemat a múltiples nodes executables . Si un node d'execució falla en el temps d'execució, obté instantàniament la còpia de seguretat del fitxer següent node executable . Per això, els RDD es consideren un tipus avançat d’estructures de dades en comparació amb altres estructures de dades tradicionals. Els RDD poden emmagatzemar dades estructurades, no estructurades i semiestructurades.

Seguim endavant amb el nostre RDD mitjançant el bloc Spark i coneixem les característiques úniques dels RDD que li aporten un avantatge respecte a altres tipus d’estructures de dades.

Característiques de RDD

  • En memòria (RAM) Càlculs : El concepte de càlcul In-Memory porta el processament de dades a una etapa més ràpida i eficaç en què es troba el conjunt rendiment del sistema és actualitzat.
  • L la seva avaluació : El terme avaluació mandrosa diu el transformacions s'apliquen a les dades de RDD, però la sortida no es genera. En canvi, les transformacions aplicades són registrat.
  • Persistència : Els RDD resultants sempre són reutilitzable.
  • Operacions de gra gruixut : L'usuari pot aplicar transformacions a tots els elements dels conjunts de dades mapa, filtre o bé agrupar per operacions.
  • Tolerant a fallades : Si hi ha una pèrdua de dades, el sistema pot enrere a la seva estat original mitjançant el registre transformacions .
  • Immutabilitat : Les dades definides, recuperades o creades no poden ser canviat un cop iniciada la sessió al sistema. En cas que necessiteu accedir i modificar el RDD existent, heu de crear un RDD nou aplicant un conjunt de Transformació funciona al RDD actual o anterior.
  • Particionament : Això és el unitat crucial de paral·lelisme a Spark RDD. Per defecte, el nombre de particions creades es basa en la vostra font de dades. Fins i tot podeu decidir el nombre de particions que voleu fer servir partició personalitzada funcions.

Creació de RDD mitjançant Spark

Els RDD es poden crear a tres maneres:

  1. Lectura de dades de col·leccions paral·leles
val PCRDD = spark.sparkContext.parallelize (Array ('dilluns', 'dim', 'dim', 'dij', 'divendres', 'ds'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. Sol·licitud transformació en RDD anteriors
val words = spark.sparkContext.parallelize (Seq ('Spark', 'is', 'a', 'very', 'powerful', 'language')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Lectura de dades de emmagatzematge extern o camins de fitxers com HDFS o bé Base HB
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Operacions realitzades en RDD:

Hi ha principalment dos tipus d’operacions que es realitzen en RDD, a saber:

  • Transformacions
  • Accions

Transformacions : El operacions apliquem a RDD a filtre, accés i modificar les dades del RDD pare per generar un fitxer RDD successiu es diu transformació . El nou RDD retorna un punter al RDD anterior assegurant la dependència entre ells.

Les transformacions són Avaluacions mandroses, en altres paraules, les operacions aplicades al RDD que esteu treballant es registraran, però no executat. El sistema genera un resultat o excepció després d 'activar el fitxer Acció .

Podem dividir les transformacions en dos tipus de la següent manera:

  • Transformacions estretes
  • Àmplies transformacions

Transformacions estretes Apliquem transformacions estretes a a partició única del RDD pare per generar un RDD nou ja que les dades necessàries per processar el RDD estan disponibles en una sola partició del fitxer TEA pare . Els exemples de transformacions estretes són:

  • mapa ()
  • filter ()
  • flatMap ()
  • partició ()
  • mapPartitions ()

Àmplies transformacions: Apliquem l’àmplia transformació particions múltiples per generar un nou RDD. Les dades necessàries per processar el RDD estan disponibles a les múltiples particions del fitxer TEA pare . Els exemples de transformacions àmplies són:

  • reduceBy ()
  • Unió()

Accions : Les accions indiquen que s'apliqui a Apache Spark càlcul i torneu a passar el resultat o una excepció al controlador RDD. Poques de les accions inclouen:

  • recollir ()
  • count ()
  • prendre ()
  • primer()

Apliquem pràcticament les operacions als RDD:

IPL (Premier League de l'Índia) és un torneig de cricket amb un nivell màxim. Així, doncs, posem les mans al conjunt de dades IPL i executem el nostre RDD mitjançant Spark.

  • En primer lloc, baixem les dades de concordança CSV d’IPL. Després de descarregar-lo, comença a semblar un fitxer EXCEL amb files i columnes.

En el següent pas, activem l'espurna i carreguem el fitxer matches.csv des de la seva ubicació, en el meu cas el meucsvla ubicació del fitxer és '/User/edureka_566977/test/matches.csv'

Comencem ara per Transformació primera part:

  • map ():

Fem servir Transformació de mapes per aplicar una operació de transformació específica a cada element d'un RDD. Aquí creem un RDD per nom CKfile on emmagatzemem el nostre fitxercsvdossier. Crearem un altre RDD anomenat States to emmagatzemar els detalls de la ciutat .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println) val states = CKfile.map (_. split (',') (2)) states.collect (). foreach (println)

  • filter ():

Transformació del filtre, el propi nom en descriu l’ús. Utilitzem aquesta operació de transformació per filtrar les dades selectives d’una col·lecció de dades donada. Sol·licitem operació de filtre aquí per obtenir els registres dels partits IPL de l'any 2017 i emmagatzemeu-lo al fitxer RDD.

val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

Apliquem que FlatMap és una operació de transformació a cadascun dels elements d’un RDD per crear un nou RDD. És similar a la transformació de mapes. aquí apliquemMapa plaa escopir els partits de la ciutat d'Hyderabad i emmagatzemar les dades afilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('Hyderabad')). collect ()

  • partició ():

Totes les dades que escrivim en un RDD es divideixen en un nombre determinat de particions. Utilitzem aquesta transformació per trobar el nombre de particions les dades es divideixen en realitat.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

Considerem MapPatitions com una alternativa de Map () iper cadascú() junts. Utilitzem mapPartitions aquí per trobar el fitxer nombre de files tenim al nostre fitxer RDD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • reduceBy ():

Fem servirReduceBy() encès Parells clau-valor . Hem utilitzat aquesta transformació en el nostrecsvfitxer per trobar el reproductor amb el fitxer Home més alt dels partits .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • Unió():

El nom ho explica tot, fem servir la transformació sindical clavar dos RDD junts . Aquí estem creant dos RDD, és a dir, fil i fil2. fil RDD conté els registres de coincidències IPL de 2017 i fil2 RDD conté registres de coincidències IPL de 2016.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Comencem per la Acció part on mostrem la sortida real:

  • collect ():

Recollir és l’acció que solem fer mostrar el contingut al RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println)

  • count ():

Comptaés una acció que fem servir per comptar el nombre de registres present al RDD.Aquíutilitzem aquesta operació per comptar el nombre total de registres del nostre fitxer matches.csv.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.count ()

  • take ():

Take és una operació d'acció similar a la de recollir, però l'única diferència és que pot imprimir-ne qualsevol nombre selectiu de files segons la sol·licitud de l'usuari. Aquí apliquem el següent codi per imprimir el fitxer deu informes principals.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. take (10) .foreach (println)

  • primer():

First () és una operació d'acció similar a collect () i take ()aixòs'utilitza per imprimir l'informe superior s la sortida Aquí fem servir la primera operació () per trobar el fitxer nombre màxim de partits jugats en una ciutat concreta i aconseguim Mumbai com a sortida.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') val states = CKfile.map (_. split (',') (2)) val Scount = states.map (Scount => { Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Per fer que el nostre procés RDD d’aprenentatge amb Spark sigui encara més interessant, se m’ha acudit un cas d’ús interessant.

RDD amb Spark: cas d'ús de Pokémon

  • En primer lloc, Baixem-nos un fitxer Pokemon.csv i el carreguem al spark-shell tal com hem fet al fitxer Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Els Pokemons estan disponibles en una gran varietat. Anem a trobar algunes varietats.

  • S'està suprimint l'esquema del fitxer Pokemon.csv

És possible que no necessitem el Esquema del fitxer Pokemon.csv. Per tant, l’eliminem.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Trobar el nombre de particions es distribueix el nostre pokemon.csv.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Pokémon d’aigua

Trobant el nombre de pokemon d'aigua

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Foc Pokémon

Trobant el nombre de Pokémon Fire

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • També podem detectar població d’un tipus diferent de pokemon que utilitza la funció de recompte
WaterRDD.count () FireRDD.count ()

  • Ja que m'agrada el joc de estratègia defensiva anem a trobar el pokemon amb màxima defensa.
val defenceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • Sabem el màxim valor de força de defensa però no sabem quin pokemon és. doncs, anem a trobar quin és això Pokémon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. mapa {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Comanda [Doble] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Ara anem a ordenar els Pokémon amb menys defensa
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Ara vegem els Pokémon amb un estratègia menys defensiva.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val def2 = NoP2 .map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Ordering [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Així, amb això, arribem al final d’aquest RDD mitjançant l’article Spark. Espero que hem donat una mica de llum al vostre coneixement sobre els RDD, les seves característiques i els diversos tipus d’operacions que es poden realitzar amb ells.

Aquest article es basa en està dissenyat per preparar-vos per a l'examen de certificació Cloudera Hadoop and Spark Developer (CCA175). Obteniu un coneixement profund sobre Apache Spark i l’ecosistema Spark, que inclou Spark RDD, Spark SQL, Spark MLlib i Spark Streaming. Obteniu un coneixement complet sobre el llenguatge de programació Scala, HDFS, Sqoop, Flume, Spark GraphX ​​i el sistema de missatgeria com Kafka.