<p>Flink中的DataStream程序是在数据流上实现转换的常规程序(例如,filtering, updating state, defining windows, aggregating)。数据流最初是由不同的源创建的(例如,message queues, socket streams, files)。结果通过接收器返回,例如,接收器可以将数据写入文件或标准输出(例如the command line terminal)。Flink程序在各种上下文中运行,独立运行或嵌入到其他程序中。执行可以在本地JVM中进行,也可以在许多机器的集群上进行。</p>
<p>有关Flink API的基本概念的介绍,请参阅<a href="https://blog.csdn.net/zpf_940810653842/article/details/99636978">Flink基础之API,DataSet、DataStream、批、流</a></p>
<p>以下为简易的Flink DataStream程序。</p>
<h3>样例程序</h3>
<p>下面的程序是一个完整的、可工作的流窗口单词计数应用程序示例,它在5秒内计算来自web套接字的单词。您可以复制并粘贴代码以在本地运行它。</p>
<pre class="blockcode"><code class="language-java">import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;
public class CommonDemo1Test {
@Test
public void demo1() throws Exception {
ParameterTool params = ParameterTool.fromArgs(new String[0]);
StreamExecutionEnvironment ev = StreamExecutionEnvironment.getExecutionEnvironment();
ev.getConfig().setGlobalJobParameters(params);
DataStream<String> text = null;
if (params.has("input")) {
text = ev.readTextFile(params.get("input"));
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
text = ev.fromElements(WordCountData.WORDS);
}
System.out.println(text);
SingleOutputStreamOperator<Tuple2<String, Integer>> res = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens = s.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 1) {
collector.collect(Tuple2.of(token, 1));
}
}
}
}).keyBy(0).sum(1);
if (params.has("output")) {
res.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
res.print();
}
// execute program
ev.execute("Streaming WordCount");
}
@Test
public void demo2() {
}
}</code></pre>
<pre class="blockcode"><code class="language-java">public class WordCountData {
public static final String[] WORDS = new String[]{
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,",
"And by opposing end them?--To die,--to sleep,--",
"No more; and by a sleep to say we end",
"The heartache, and the thousand natural shocks",
"That flesh is heir to,--'tis a consummation",
"Devoutly to be wish'd. To die,--to sleep;--",
"To sleep! perchance to dream:--ay, there's the rub;",
"For in that sleep of death what dreams may come,",
"When we have shuffled off this mortal coil,",
"Must give us pause: there's the respect",
"That makes calamity of so long life;",
"For who would bear the whips and scorns of time,",
"The oppressor's wrong, the proud man's contumely,",
"The pangs of despis'd love, the law's delay,",
"The insolence of office, and the spurns",
"That patient merit of the unworthy takes,",
"When he himself might his quietus make",
"With a bare bodkin? who would these fardels bear,",
"To grunt and sweat under a weary life,",
"But that the dread of something after death,--",
"The undiscover'd country, from whose bourn",
"No traveller returns,--puzzles the will,",
"And makes us rather bear those ills we have",
"Than fly to others that we know not of?",
"Thus conscience does make cowards of us all;",
"And thus the native hue of resolution",
"Is sicklied o'er with th |
|