这篇文章主要讲解了“Flink中keyBy有哪些方式指定key”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Flink中keyBy有哪些方式指定key”吧!
创新互联公司服务项目包括清河门网站建设、清河门网站制作、清河门网页制作以及清河门网络营销策划等。多年来,我们专注于互联网行业,利用自身积累的技术优势、行业经验、深度合作伙伴关系等,向广大中小型企业、政府机构等提供互联网行业的解决方案,清河门网站推广取得了明显的社会效益与经济效益。目前,我们服务的客户以成都为中心已经辐射到清河门省份的部分城市,未来相信会继续扩大服务区域并继续获得客户的支持与信任!
keyBy 如何指定key
不管是stream还是batch处理,都有一个keyBy(stream)和groupBy(batch)操作。那么该如何指定key?
Some transformations (join, coGroup, keyBy, groupBy) require that a key be defined on a collection of elements. Other transformations (Reduce, GroupReduce, Aggregate, Windows) allow data being grouped on a key before they are applied.
一些算子(transformations)例如join,coGroup,keyBy,groupBy往往需要定义一个key。其他的算子例如Reduce, GroupReduce, Aggregate, Windows,也允许数据按照key进行分组。
DataSet
DataSet<...> input = // [...] DataSet<...> reduced = input .groupBy(/*define key here*/) .reduceGroup(/*do something*/);
DataStream
DataStream<...> input = // [...] DataStream<...> windowed = input .keyBy(/*define key here*/) .window(/*window specification*/);
类似于MySQL中的join操作:select a.* , b.* from a join b on a.id=b.id
这里的keyBy就是a.id=b.id
有哪几种方式定义Key?
方式一:Tuple
DataStream> input = // [...] KeyedStream ,Tuple> keyed = input.keyBy(0)
可以传字段的位置
DataStream> input = // [...] KeyedStream ,Tuple> keyed = input.keyBy(0,1)
可以传字段位置的组合
这对于简单的使用时没问题的。但是对于内嵌的Tuple,如下所示:
DataStream,String,Long>> ds;
如果使用keyBy(0),那么他就会使用整个Tuple2
方式二:字段表达式
我们可以使用基于字符串字段表达式来引用内嵌字段去定义key。
之前我们的算子写法是这样的:
text.flatMap(new FlatMapFunction>() { @Override public void flatMap(String value, Collector > out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length() > 0) { out.collect(new Tuple2 (token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);
其中的new FlatMapFunction
public static class WC { private String word; private int count; public WC() { } public WC(String word, int count) { this.word = word; this.count = count; } @Override public String toString() { return "WC{" + "word='" + word + '\'' + ", count=" + count + '}'; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } }
修改算子的写法:
text.flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector out) throws Exception { String[] tokens = value.toLowerCase().split(","); for (String token : tokens) { if (token.length() > 0) { out.collect(new WC(token, 1)); } } } }).keyBy("word").timeWindow(Time.seconds(5)).sum("count").print().setParallelism(1);
将原来的输出Tuple2
因此,在这个例子中我们有一个POJO类,有两个字段分别是"word"和"count",可以传递字段名到keyBy("")中。
语法:
字段名一定要与POJO类中的字段名一致。一定要提供默认的构造函数,和get与set方法。
使用Tuple时,0表示第一个字段
可以使用嵌套方式,举例如下:
public static class WC { public ComplexNestedClass complex; //nested POJO private int count; // getter / setter for private field (count) public int getCount() { return count; } public void setCount(int c) { this.count = c; } } public static class ComplexNestedClass { public Integer someNumber; public float someFloat; public Tuple3word; public IntWritable hadoopCitizen; }
"count",指向的是WC中的字段count
"complex",指向的是复杂数据类型,会递归选择所有ComplexNestedClass的字段
"complex.word.f2",指向的是Tuple3中的最后一个字段。
"complex.hadoopCitizen",指向的是Hadoop
IntWritable
type
scala写法:
object StreamingWCScalaApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 引入隐式转换 import org.apache.flink.api.scala._ val text = env.socketTextStream("192.168.152.45", 9999) text.flatMap(_.split(",")) .map(x => WC(x,1)) .keyBy("word") .timeWindow(Time.seconds(5)) .sum("count") .print() .setParallelism(1) env.execute("StreamingWCScalaApp"); } case class WC(word: String, count: Int) }
方式三:key选择器函数
.keyBy(new KeySelector() { @Override public Object getKey(WC value) throws Exception { return value.word; } })
感谢各位的阅读,以上就是“Flink中keyBy有哪些方式指定key”的内容了,经过本文的学习后,相信大家对Flink中keyBy有哪些方式指定key这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!
当前标题:Flink中keyBy有哪些方式指定key
链接URL:http://lswzjz.com/article/psddjg.html