发布网友 发布时间:2022-04-22 06:43
共2个回答
懂视网 时间:2022-05-03 00:18
#字段用table键分隔zs 10
30.0
li 12 32.0
# spark-shell -i:start.scala
scala> help
依据提示逐步执行
import org.apache.spark.sql.SchemaRDD
var FIELD_SEPERATOR = " "
var RECORD_SEPERATOR = "
"
var lastrdd : SchemaRDD = null
object MyFileUtil extends java.io.Serializable {
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FileStatus
import scala.collection.mutable.ListBuffer
def regularFile(filepath:String):String = {
if(filepath == "") {
filepath;
} else if(filepath.startsWith("hdfs:")) {
filepath
} else if(filepath.startsWith("file:")) {
filepath
} else if(filepath.startsWith("/")) {
"file://" + filepath
} else {
val workdir = System.getProperty("user.dir")
"file://" + workdir + "/" + filepath
}
}
var SAFEMINPATH_LENGTH : Int = 24
def getFileSystem(filepath:String) = {
if(filepath.startsWith("hdfs:")) {
FileSystem.get(new org.apache.hadoop.conf.Configuration());
} else if(filepath.startsWith("file:")) {
FileSystem.getLocal(new org.apache.hadoop.conf.Configuration());
} else {
throw new Exception("file path invalid")
}
}
def deletePath(filepath:String) = {
if(filepath.length < SAFEMINPATH_LENGTH)
throw new Exception("file path is to short")
var fs : FileSystem = getFileSystem(filepath)
if (fs.exists(new Path(filepath))) {
fs.delete(new Path(filepath), true);
}
}
def listFile(fs:FileSystem, path:Path, pathlist:ListBuffer[Path], statuslist:ListBuffer[FileStatus]=null) {
if ( fs.exists(path) ) {
val substatuslist = fs.listStatus(path);
for(substatus <- substatuslist){
if(statuslist != null)
statuslist.append(substatus)
if(substatus.isDir()){
listFile(fs,substatus.getPath(),pathlist);
}else{
pathlist.append(substatus.getPath());
}
}
}
}
def hasContext(filepath:String) = {
val realpath = regularFile(filepath)
val fs = getFileSystem(realpath)
val pathlist = ListBuffer[Path]()
val statuslist = ListBuffer[FileStatus]()
listFile(fs,new Path(filepath),pathlist,statuslist)
var length:Long = 0
for( status <- statuslist )
length += status.getLen()
length > 0
}
}
org.apache.spark.repl.Main.interp.command("""
class MySchemaRDD(rdd:org.apache.spark.sql.SchemaRDD) extends java.io.Serializable {
def go() = {
var startstr = ""
var endstr = RECORD_SEPERATOR
val result = rdd.collect
result.foreach( x =>
print(x.mkString(startstr,FIELD_SEPERATOR,endstr))
)
}
def result() = {
rdd.collect
}
def saveto(output: String) = {
import org.apache.hadoop.io.{NullWritable,Text}
var startstr = ""
var endstr = RECORD_SEPERATOR
if(output.startsWith("hdfs:")) {
val outputpath = MyFileUtil.regularFile(output)
MyFileUtil.deletePath(outputpath)
rdd.map(x =>
(NullWritable.get(), new Text(x.mkString(FIELD_SEPERATOR)))
).saveAsHadoopFile[
org.apache.hadoop.mapred.TextOutputFormat[NullWritable, Text]
](outputpath)
} else {
val outputpath = MyFileUtil.regularFile(output)
MyFileUtil.deletePath(outputpath)
val result = rdd.collect()
val writer = new java.io.FileWriter(output)
result.foreach(x =>
writer.write(x.mkString(startstr,FIELD_SEPERATOR,endstr))
)
writer.close()
}
}
}
object MySchemaRDD {
implicit def toMySchemaRDD(rdd:org.apache.spark.sql.SchemaRDD) = new MySchemaRDD(rdd)
}
""")
val ssc = new org.apache.spark.sql.SQLContext(sc)
import ssc._
import MySchemaRDD._
def getRegisterString(rddname:String,classname:String,tablename:String,tabledef:String) : String = {
val members = tabledef.trim.split(",").map(_.trim.split(" ").filter(""!=)).map(x => (x(0).trim,x(1).trim.head.toString.toUpperCase+x(1).trim.tail))
val classmemberdef = members.map(x => (x._1+":"+x._2)).mkString(",")
val convertstr = members.map(x => x._2).zipWithIndex.map(x => "t("+x._2+").to"+x._1).mkString(",")
return s"""
case class ${classname}(${classmemberdef})
val schemardd = ${rddname}.map(_.split("${FIELD_SEPERATOR}")).map(t=>${classname}(${convertstr}))
ssc.registerRDDAsTable(schemardd,"${tablename}")
"""
}
org.apache.spark.repl.Main.interp.command("""
class MyCommandTranslator(cmd:String) extends java.io.Serializable {
def go()(implicit f: SchemaRDD => MySchemaRDD) = {
lastrdd = sql(cmd)
lastrdd.go()
}
def saveto(output: String)(implicit f: SchemaRDD => MySchemaRDD) = {
lastrdd = sql(cmd)
lastrdd.saveto(output)
}
def result()(implicit f: SchemaRDD => MySchemaRDD) = {
lastrdd = sql(cmd)
lastrdd.result()
}
// def hqlgo()(implicit f: SchemaRDD => MySchemaRDD) = {
// lastrdd = hql(cmd)
// lastrdd.go()
// }
//
// def hqlsaveto(output: String)(implicit f: SchemaRDD => MySchemaRDD) = {
// lastrdd = hql(cmd)
// lastrdd.saveto(output)
// }
//
// def hqlresult()(implicit f: SchemaRDD => MySchemaRDD) = {
// lastrdd = hql(cmd)
// lastrdd.result()
// }
def defineas(tabledef:String) = {
if( tabledef != "" ) {
org.apache.spark.repl.Main.interp.command(
getRegisterString(cmd,cmd.toUpperCase,cmd,tabledef)
)
} else {
org.apache.spark.repl.Main.interp.command(
"ssc.registerRDDAsTable(${cmd},"${cmd}")"
)
}
}
def from(filepath:String) {
if( cmd.trim.startsWith("create table ") ) {
val tablename = cmd.trim.substring(13).trim().split(" ")(0)
val leftstr = cmd.substring(13).trim().substring(tablename.length).trim()
val tabledef = leftstr.substring(1,leftstr.length-1).trim()
val realfile = MyFileUtil.regularFile(filepath)
org.apache.spark.repl.Main.interp.command(
"val "+tablename+" = sc.textFile(""+realfile+"")"
)
new MyCommandTranslator(tablename).defineas(tabledef)
} else {
println("usage:")
println(""create table sometablename (field1 string,field2 int...)" from "somefile or hdfs:somepath"")
}
}
def isok() = {
if(cmd.contains(".") || cmd.contains("/")) {
MyFileUtil.hasContext(cmd)
} else {
val res = sql(s"select count(*) from ${cmd}").result()
val count = res(0).getLong(0)
count > 0
}
}
}
object MyCommandTranslator {
implicit def stringToTranslator(cmd:String) = new MyCommandTranslator(cmd)
def show(tabledata:Array[org.apache.spark.sql.Row]) = {
tabledata.foreach( x => println(x.mkString(" ")))
}
}
""")
def to = MyCommandTranslator
import MyCommandTranslator._
val onetable = sql("select 1 as id")
ssc.registerRDDAsTable(onetable,"onetable")
def help = {
println("""example:
"create table testperson (name string,age int,weight double)" from "testperson.txt"
"select * from testperson" go
"select * from testperson" saveto "somelocalfile.txt"
"select * from testperson" saveto "hdfs:/basedir/parentdir/testperson"
"testperson" isok
"somelocalfile.txt" isok
"hdfs:/basedir/parentdir/testperson" isok
val data = "select * from testperson" result
to show data
val somerdddata = sc.textFile("hdfs:/basedir/parentdir/testperson")<span style="font-family: Arial, Helvetica, sans-serif;"> </span>
"somerdddata" defineas "name string,age int,weight double"
"select * from somerdddata" go
if you want to see the help of enveronment, please type :help
""")
}
热心网友 时间:2022-05-02 21:26
1.熟练掌握Scala语言
Spark框架是采用Scala语言编写的,精致而优雅。要想成为Spark高手,你就必须阅读Spark的源代码,就必须掌握Scala;
虽然说现在的Spark可以采用多语言Java、Python等进行应用程序开发,但是最快速的和支持最好的开发API依然并将永远是Scala方式的API,所以你必须掌握Scala来编写复杂的和高性能的Spark分布式程序;
尤其要熟练掌握Scala的trait、apply、函数式编程、泛型、逆变与协变等;
2.精通Spark平台API
掌握Spark中面向RDD的开发模式,掌握各种transformation和action函数的使用;
掌握Spark中的宽依赖和窄依赖以及lineage机制;
掌握RDD的计算流程,例如Stage的划分、Spark应用程序提交给集群的基本过程和Worker节点基础的工作原理等。
3.深入Spark内核
此阶段主要是通过Spark框架的源码研读来深入Spark内核部分:
通过源码掌握Spark的任务提交过程;
通过源码掌握Spark集群的任务调度;
尤其要精通DAGScheler、TaskScheler和Worker节点内部的工作的每一步的细节;
4.掌握基于Spark的核心框架
Spark作为云计算大数据时代的集大成者,在实时流处理、图技术、机器学习、NoSQL查询等方面具有显著的优势,我们使用Spark的时候大部分时间都是在使用其上的框架例如Shark、Spark Streaming等:
Spark Streaming是非常出色的实时流处理框架,要掌握其DStream、transformation和checkpoint等;
Spark的离线统计分析功能,Spark1.0.0版本在Shark的基础上推出了SparkSQL,离线统计分析的功能的效率有显著的提升,需要重点掌握;
对于Spark的机器学习和GraphX等要掌握其原理和用法;
5.做商业级别的Spark项目
通过一个完整的具有代表性的Spark项目来贯穿Spark的方方面面,包括项目的架构设计、用到的技术的剖析、开发实现、运维等,完整掌握其中的每一个阶段和细节,这样就可以让您以后可以从容面对绝大多数Spark项目。追答 6.提供Spark解决方案
彻底掌握Spark框架源码的每一个细节;
根据不同的业务场景的需要提供Spark在不同场景的下的解决方案;
根据实际需要,在Spark框架基础上进行二次开发,打造自己的Spark框架;
前面所述的成为Spark高手的六个阶段中的第一和第二个阶段可以通过自学逐步完成,随后的三个阶段最好是由高手或者专家的指引下一步步完成,最后一个阶段,基本上就是到'无招胜有招'的时期,很多东西要用心领悟才能完成。