Scalding是对Cascading框架的Scala封装,或者更确切地说是一种函数式封装。看到Cascading的时候你可能会觉得这么麻烦的东西有必要学吗?但是再看看Scalding就会发现,这好像跟写一般的Scala代码也没什么区别……小小的封装带来巨大的改变。MapReduce/Spark这一遍玩下来最大的体会就是——用命令式语言表达函数式的东西真是太费劲了(map/reduce是函数式语言的基本操作)!
写惯了Spark再来看java的MapReduce的感觉就是再也回不去了。MapReduce是初恋又如何,函数式语言天生表达map、reduce就是比java这种命令式的要强得多得多得多。在用Spark用了一年之后基本上还是能总结出相对于MapReduce来说Spark的缺点:在单轮任务(典型的的ETL)上基本上没有什么优势,对于集群的利用来说与MapReduce还是有差距,具体表现在资源使用模式固定,无法像MR一样伸缩式的起大量map,这在资源空闲时是巨大的浪费。
1. 为什么是Scalding
网上找到一篇文章列举了列举了Pig, Scalding, Scoobi, Hive, Spark, Scrunch, Cascalog相关介绍的文章列表1。对于我自己而言:
1. 我需要一个简单框架来写MapReduce,并且能够很好利用现有库,hive、pig、spark出局;
2. 需要函数式使用API,Scala封装是很好的选择,还剩下Scoobi, Scalding, Scrunch
3. 易用,满足我司常见的三个场景的需求(列举在下文)。Scoobi, Scrunch对Thrift Parquet支持不好,出局。
从实际使用上来看,用Scala封装得比较好的MapReduce一样可以和写Spark一样简单。赞!
2. 实际用例
Scalding有两种API:Type-safe API2和 Fields Based API3。Fields Based API有点SQL的意思,喜欢的话还是不错的,专门有一本书叫《programming mapreduce with scalding4》,里面全部都是用Fields Based API讲的。全书总共也就100来页,很快就可以学完。
个人是先接触的Spark,已经被Typesafe洗脑,还是更倾向于接受Type-safe API。下面所有的例子都 是基于Type-safe API的。下面是针对一种是针对我司常见的三种应用场景分别介绍,
直接上的例子,具体地有兴趣可以自行查询API,一些相关的参考资料已经标在脚注中了。
2.1 读写纯文本文件
import com.twitter.scalding._ class TextFileExample(args: Args) extends Job(args) {
TypedPipe.from(TextLine(args("input"))) .flatMap(line => line.split("\\s")) .map { word => (word, 1L) } .sumByKey // reduce num not set .write(TypedTsv[(String, Long)](args("output"))) }
讯享网
2.2 读写SequenceFile
讯享网import org.apache.hadoop.io.{BytesWritable, Text} import com.twitter.scalding._ class SequenceFileExample(args: Args) extends Job(args) {
object Agg extends Aggregator[(String, Int), Int, Int] {
def prepare(v: (String, Int)) = v._2 val semigroup = Semigroup.from { (l: Int, r: Int) => l + r } def present(v: Int) = v } TypedPipe.from(WritableSequenceFile[BytesWritable, BytesWritable](args("input"))) .flatMap { case (k, v) => val line = new String(v.getBytes, 0, v.getLength) line.split("\\s").map(w => (w, 2)) } .groupBy(_._1).aggregate(Agg) // aggregate by key .toTypedPipe .map { case (k, v) => (new Text(k), new io.IntWritable(v)) } .reduce((a, b) => (a._1, a._2 + b._2)) .write(WritableSequenceFile(args("output"))) }
2.3. 读写Thrift+Paruqet文件
import parquet.thrift.test.Name // thrift对象 import com.twitter.scalding._ import com.twitter.scalding.parquet.thrift.FixedPathParquetThrift class ThriftParquetFileExample(args: Args) extends Job(args) {
val source = new FixedPathParquetThrift[Name](args("input")) val sink = new FixedPathParquetThrift[Name](args("output")) TypedPipe.from(source) .map { name => println(name); name } .write(sink) }
3. 更多
看起来还不错,但是其实与实际应用还是有些差距的,要解决更多的问题。
3.1. MapReduce配置
重载config方法可以完成对MapReduce任务的配置,包括压缩、队列等等,用-Dmapred.map.output.compress=true这样的配在启动命令里面也是可以的。
- 设置压缩(太不友好了)
讯享网 override def config: Map[AnyRef, AnyRef] = super.config ++ Map( // JOB OUTPUT "mapred.output.fileoutputformat.compress" -> "true", "mapred.output.fileoutputformat.compress.codec" -> "parquet.hadoop.codec.SnappyCodec", "mapred.output.fileoutputformat.compress.type" -> "BLOCK", "mapred.output.compression.type" -> "BLOCK", "mapred.output.compress" -> "true", "mapred.output.compression.codec" -> "parquet.hadoop.codec.SnappyCodec", // MAP OUTPUT "mapred.map.output.compress" -> "true", "mapred.map.output.compress.codec" -> "parquet.hadoop.codec.SnappyCodec" )
- 设置队列
override def config: Map[AnyRef,AnyRef] = { super.config ++ Map ("mapreduce.job.queuename" -> args("queue")) }
3.2. 命令行参数解析
这部分不用担心,Args类实现了一些基本的,可以满足大部分需求。
讯享网// class Xyyy(args: Args) extends Job(args) val input: String = args("input") // --input后面接了单个的字符串 val inputs: List[String] = args.list("input") // --input后面接了多个字符串 val numReduce: Int = args.int("num-reduce") // --num-reduce 后面接一个整数
3.3. 在IDE中运行与调试
- 在IDE中复制一个已有的运行时的配置,在主类填
com.twitter.scalding.Tool,然后参数ThriftParquetFileExample --hdfs --input testdata/name.parquet --output output - 加伴生类
class TextFileExample(args: Args) extends Job(args) {
... } object TextFileExample extends App {
Tool.main(getClass.getCanonicalName.stripSuffix("$") +: args) }
3.4. Counter
讯享网import cascading.flow.{Flow, FlowListener} import com.twitter.scalding._ class TextFileExample(args: Args) extends Job(args) {
val key = StatKey("word", "udc") val stat = Stat(key) TypedPipe.from(TextLine(args("input"))) .flatMap(line => line.split("\\s")) .map { word => stat.inc; (word, 1L) } .sumByKey // reduce num not set .write(TypedTsv[(String, Long)](args("output"))) / * 任务结束后,自定义的Counter会打印出来,如果不需要获取Counter做更多操作,可以不重载这里 * "copy" from https://itellity.wordpress.com/2014/10/29/counters-using-cascading-flow-listeners-in-scalding/ */ override def listeners = super.listeners ++ List(new FlowListener { override def onStarting(flow: Flow[_]): Unit = {} override def onCompleted(flow: Flow[_]) { try { val fs = flow.getFlowStats println(key.group, key.counter, fs.getCounterValue(key.group, key.counter)) } catch { case e: Exception => e.printStackTrace() } } override def onThrowable(flow: Flow[_], e: Throwable): Boolean = { e.printStackTrace() true } override def onStopping(flow: Flow[_]): Unit = {} }) }
4. GitHub上的Scalding-Demo工程
- 输出默认是覆盖,可以配置为存在的时候写出错
- 比reduce更通用的是aggregator,需要注意使用方法
- 能够用于提交执行的jar包需要打进去一些相关依赖,项目的pom中给出了一个可用例子
- 以及一些提交任务的脚本
不过还有一种需求需要找一下怎么实现——MapReduce的Setup阶段的公共数据初始化,实现了这个就基本上完美了。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/66057.html