怎样成为Spark高手

发布网友 发布时间:2022-04-22 06:43

我来回答

2个回答

懂视网 时间:2022-05-03 00:18

#字段用table键分隔

zs 10 30.0
li 12 32.0

# spark-shell -i:start.scala

scala> help

依据提示逐步执行

import org.apache.spark.sql.SchemaRDD 
 
var FIELD_SEPERATOR = "	" 
var RECORD_SEPERATOR = "
" 
var lastrdd : SchemaRDD = null 
 
object MyFileUtil extends java.io.Serializable { 
 import org.apache.hadoop.fs.Path 
 import org.apache.hadoop.fs.FileSystem 
 import org.apache.hadoop.fs.FileStatus 
 import scala.collection.mutable.ListBuffer 
 
 def regularFile(filepath:String):String = { 
 if(filepath == "") { 
  filepath; 
 } else if(filepath.startsWith("hdfs:")) { 
  filepath 
 } else if(filepath.startsWith("file:")) { 
  filepath 
 } else if(filepath.startsWith("/")) { 
  "file://" + filepath 
 } else { 
  val workdir = System.getProperty("user.dir") 
  "file://" + workdir + "/" + filepath 
 } 
 } 
 
 var SAFEMINPATH_LENGTH : Int = 24 
 
 def getFileSystem(filepath:String) = { 
 if(filepath.startsWith("hdfs:")) { 
  FileSystem.get(new org.apache.hadoop.conf.Configuration()); 
 } else if(filepath.startsWith("file:")) { 
  FileSystem.getLocal(new org.apache.hadoop.conf.Configuration()); 
 } else { 
  throw new Exception("file path invalid") 
 } 
 } 
 
 def deletePath(filepath:String) = { 
 if(filepath.length < SAFEMINPATH_LENGTH) 
  throw new Exception("file path is to short") 
 var fs : FileSystem = getFileSystem(filepath) 
 if (fs.exists(new Path(filepath))) { 
  fs.delete(new Path(filepath), true); 
 } 
 } 
 
 def listFile(fs:FileSystem, path:Path, pathlist:ListBuffer[Path], statuslist:ListBuffer[FileStatus]=null) { 
 if ( fs.exists(path) ) { 
  val substatuslist = fs.listStatus(path); 
  for(substatus <- substatuslist){ 
  if(statuslist != null) 
   statuslist.append(substatus) 
  if(substatus.isDir()){ 
   listFile(fs,substatus.getPath(),pathlist); 
  }else{ 
   pathlist.append(substatus.getPath()); 
  } 
  } 
 } 
 } 
 
 def hasContext(filepath:String) = { 
 val realpath = regularFile(filepath) 
 val fs = getFileSystem(realpath) 
 val pathlist = ListBuffer[Path]() 
 val statuslist = ListBuffer[FileStatus]() 
 listFile(fs,new Path(filepath),pathlist,statuslist) 
 var length:Long = 0 
 for( status <- statuslist ) 
  length += status.getLen() 
 length > 0 
 } 
} 
 
org.apache.spark.repl.Main.interp.command(""" 
class MySchemaRDD(rdd:org.apache.spark.sql.SchemaRDD) extends java.io.Serializable { 
 
 def go() = { 
 var startstr = "" 
 var endstr = RECORD_SEPERATOR 
 val result = rdd.collect 
 result.foreach( x => 
  print(x.mkString(startstr,FIELD_SEPERATOR,endstr)) 
  ) 
 } 
 
 def result() = { 
 rdd.collect 
 } 
 
 def saveto(output: String) = { 
 import org.apache.hadoop.io.{NullWritable,Text} 
 var startstr = "" 
 var endstr = RECORD_SEPERATOR 
 if(output.startsWith("hdfs:")) { 
  val outputpath = MyFileUtil.regularFile(output) 
  MyFileUtil.deletePath(outputpath) 
  rdd.map(x => 
   (NullWritable.get(), new Text(x.mkString(FIELD_SEPERATOR))) 
  ).saveAsHadoopFile[ 
   org.apache.hadoop.mapred.TextOutputFormat[NullWritable, Text] 
  ](outputpath) 
 } else { 
  val outputpath = MyFileUtil.regularFile(output) 
  MyFileUtil.deletePath(outputpath) 
  val result = rdd.collect() 
  val writer = new java.io.FileWriter(output) 
  result.foreach(x => 
  writer.write(x.mkString(startstr,FIELD_SEPERATOR,endstr)) 
  ) 
  writer.close() 
 } 
 } 
} 
object MySchemaRDD { 
 implicit def toMySchemaRDD(rdd:org.apache.spark.sql.SchemaRDD) = new MySchemaRDD(rdd) 
} 
""") 
 
val ssc = new org.apache.spark.sql.SQLContext(sc) 
import ssc._ 
import MySchemaRDD._ 

def getRegisterString(rddname:String,classname:String,tablename:String,tabledef:String) : String = { 
 val members = tabledef.trim.split(",").map(_.trim.split(" ").filter(""!=)).map(x => (x(0).trim,x(1).trim.head.toString.toUpperCase+x(1).trim.tail)) 
 val classmemberdef = members.map(x => (x._1+":"+x._2)).mkString(",") 
 val convertstr = members.map(x => x._2).zipWithIndex.map(x => "t("+x._2+").to"+x._1).mkString(",") 
 return s""" 
 case class ${classname}(${classmemberdef}) 
 val schemardd = ${rddname}.map(_.split("${FIELD_SEPERATOR}")).map(t=>${classname}(${convertstr})) 
 ssc.registerRDDAsTable(schemardd,"${tablename}") 
 """ 
} 

org.apache.spark.repl.Main.interp.command(""" 
class MyCommandTranslator(cmd:String) extends java.io.Serializable { 
 
 def go()(implicit f: SchemaRDD => MySchemaRDD) = { 
 lastrdd = sql(cmd) 
 lastrdd.go() 
 } 
 
 def saveto(output: String)(implicit f: SchemaRDD => MySchemaRDD) = { 
 lastrdd = sql(cmd) 
 lastrdd.saveto(output) 
 } 
 
 def result()(implicit f: SchemaRDD => MySchemaRDD) = { 
 lastrdd = sql(cmd) 
 lastrdd.result() 
 } 
 
// def hqlgo()(implicit f: SchemaRDD => MySchemaRDD) = { 
// lastrdd = hql(cmd) 
// lastrdd.go() 
// } 
// 
// def hqlsaveto(output: String)(implicit f: SchemaRDD => MySchemaRDD) = { 
// lastrdd = hql(cmd) 
// lastrdd.saveto(output) 
// } 
// 
// def hqlresult()(implicit f: SchemaRDD => MySchemaRDD) = { 
// lastrdd = hql(cmd) 
// lastrdd.result() 
// } 
 
 def defineas(tabledef:String) = { 
 if( tabledef != "" ) { 
  org.apache.spark.repl.Main.interp.command( 
  getRegisterString(cmd,cmd.toUpperCase,cmd,tabledef) 
  ) 
 } else { 
  org.apache.spark.repl.Main.interp.command( 
  "ssc.registerRDDAsTable(${cmd},"${cmd}")" 
  ) 
 } 
 } 
 
 def from(filepath:String) { 
 if( cmd.trim.startsWith("create table ") ) { 
  val tablename = cmd.trim.substring(13).trim().split(" ")(0) 
  val leftstr = cmd.substring(13).trim().substring(tablename.length).trim() 
  val tabledef = leftstr.substring(1,leftstr.length-1).trim() 
  val realfile = MyFileUtil.regularFile(filepath) 
  org.apache.spark.repl.Main.interp.command( 
  "val "+tablename+" = sc.textFile(""+realfile+"")" 
  ) 
  new MyCommandTranslator(tablename).defineas(tabledef) 
 } else { 
  println("usage:") 
  println(""create table sometablename (field1 string,field2 int...)" from "somefile or hdfs:somepath"") 
 } 
 } 
 
 def isok() = { 
 if(cmd.contains(".") || cmd.contains("/")) { 
  MyFileUtil.hasContext(cmd) 
 } else { 
  val res = sql(s"select count(*) from ${cmd}").result() 
  val count = res(0).getLong(0) 
  count > 0 
 } 
 } 
} 
object MyCommandTranslator { 
 implicit def stringToTranslator(cmd:String) = new MyCommandTranslator(cmd) 
 
 def show(tabledata:Array[org.apache.spark.sql.Row]) = { 
 tabledata.foreach( x => println(x.mkString("	"))) 
 } 
} 
""") 
 
def to = MyCommandTranslator 
import MyCommandTranslator._ 
 
val onetable = sql("select 1 as id") 
ssc.registerRDDAsTable(onetable,"onetable") 
 
def help = { 
 println("""example: 
 "create table testperson (name string,age int,weight double)" from "testperson.txt" 
 "select * from testperson" go 
 "select * from testperson" saveto "somelocalfile.txt" 
 "select * from testperson" saveto "hdfs:/basedir/parentdir/testperson" 
 "testperson" isok 
 "somelocalfile.txt" isok 
 "hdfs:/basedir/parentdir/testperson" isok 
 val data = "select * from testperson" result 
 to show data 
 val somerdddata = sc.textFile("hdfs:/basedir/parentdir/testperson")<span style="font-family: Arial, Helvetica, sans-serif;"> </span>
 "somerdddata" defineas "name string,age int,weight double" 
 "select * from somerdddata" go 
 if you want to see the help of enveronment, please type :help 
 """) 
}


热心网友 时间:2022-05-02 21:26

1.熟练掌握Scala语言
  Spark框架是采用Scala语言编写的,精致而优雅。要想成为Spark高手,你就必须阅读Spark的源代码,就必须掌握Scala;
  虽然说现在的Spark可以采用多语言Java、Python等进行应用程序开发,但是最快速的和支持最好的开发API依然并将永远是Scala方式的API,所以你必须掌握Scala来编写复杂的和高性能的Spark分布式程序;
  尤其要熟练掌握Scala的trait、apply、函数式编程、泛型、逆变与协变等;
  2.精通Spark平台API
  掌握Spark中面向RDD的开发模式,掌握各种transformation和action函数的使用;
  掌握Spark中的宽依赖和窄依赖以及lineage机制;
  掌握RDD的计算流程,例如Stage的划分、Spark应用程序提交给集群的基本过程和Worker节点基础的工作原理等。

  3.深入Spark内核
  此阶段主要是通过Spark框架的源码研读来深入Spark内核部分:
   通过源码掌握Spark的任务提交过程;
  通过源码掌握Spark集群的任务调度;
  尤其要精通DAGScheler、TaskScheler和Worker节点内部的工作的每一步的细节;
  4.掌握基于Spark的核心框架
  Spark作为云计算大数据时代的集大成者,在实时流处理、图技术、机器学习、NoSQL查询等方面具有显著的优势,我们使用Spark的时候大部分时间都是在使用其上的框架例如Shark、Spark Streaming等:
  Spark Streaming是非常出色的实时流处理框架,要掌握其DStream、transformation和checkpoint等;
  Spark的离线统计分析功能,Spark1.0.0版本在Shark的基础上推出了SparkSQL,离线统计分析的功能的效率有显著的提升,需要重点掌握;
  对于Spark的机器学习和GraphX等要掌握其原理和用法;

  5.做商业级别的Spark项目
  通过一个完整的具有代表性的Spark项目来贯穿Spark的方方面面,包括项目的架构设计、用到的技术的剖析、开发实现、运维等,完整掌握其中的每一个阶段和细节,这样就可以让您以后可以从容面对绝大多数Spark项目。追答 6.提供Spark解决方案
  彻底掌握Spark框架源码的每一个细节;
  根据不同的业务场景的需要提供Spark在不同场景的下的解决方案;
  根据实际需要,在Spark框架基础上进行二次开发,打造自己的Spark框架;
  前面所述的成为Spark高手的六个阶段中的第一和第二个阶段可以通过自学逐步完成,随后的三个阶段最好是由高手或者专家的指引下一步步完成,最后一个阶段,基本上就是到'无招胜有招'的时期,很多东西要用心领悟才能完成。

声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com