1、Flink流处理(DataStream API)概览(Data Source、Data Sink)

论坛 期权论坛     
选择匿名的用户   2021-5-30 21:03   248   0
<p>Flink中的DataStream程序是在数据流上实现转换的常规程序(例如,filtering, updating state, defining windows, aggregating)。数据流最初是由不同的源创建的(例如,message queues, socket streams, files)。结果通过接收器返回,例如,接收器可以将数据写入文件或标准输出(例如the command line terminal)。Flink程序在各种上下文中运行,独立运行或嵌入到其他程序中。执行可以在本地JVM中进行,也可以在许多机器的集群上进行。</p>
<p>有关Flink API的基本概念的介绍,请参阅<a href="https://blog.csdn.net/zpf_940810653842/article/details/99636978">Flink基础之API,DataSet、DataStream、批、流</a></p>
<p>以下为简易的Flink DataStream程序。</p>
<h3>样例程序</h3>
<p>下面的程序是一个完整的、可工作的流窗口单词计数应用程序示例,它在5秒内计算来自web套接字的单词。您可以复制并粘贴代码以在本地运行它。</p>
<pre class="blockcode"><code class="language-java">import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;

public class CommonDemo1Test {
    &#64;Test
    public void demo1() throws Exception {
        ParameterTool params &#61; ParameterTool.fromArgs(new String[0]);
        StreamExecutionEnvironment ev &#61; StreamExecutionEnvironment.getExecutionEnvironment();
        ev.getConfig().setGlobalJobParameters(params);
        DataStream&lt;String&gt; text &#61; null;
        if (params.has(&#34;input&#34;)) {
            text &#61; ev.readTextFile(params.get(&#34;input&#34;));
        } else {
            System.out.println(&#34;Executing WordCount example with default input data set.&#34;);
            System.out.println(&#34;Use --input to specify file input.&#34;);
            text &#61; ev.fromElements(WordCountData.WORDS);
        }
        System.out.println(text);
        SingleOutputStreamOperator&lt;Tuple2&lt;String, Integer&gt;&gt; res &#61; text.flatMap(new FlatMapFunction&lt;String, Tuple2&lt;String, Integer&gt;&gt;() {
            &#64;Override
            public void flatMap(String s, Collector&lt;Tuple2&lt;String, Integer&gt;&gt; collector) throws Exception {
                String[] tokens &#61; s.toLowerCase().split(&#34;\\W&#43;&#34;);
                for (String token : tokens) {
                    if (token.length() &gt; 1) {
                        collector.collect(Tuple2.of(token, 1));
                    }
                }
            }
        }).keyBy(0).sum(1);


        if (params.has(&#34;output&#34;)) {
            res.writeAsText(params.get(&#34;output&#34;));
        } else {
            System.out.println(&#34;Printing result to stdout. Use --output to specify output path.&#34;);
            res.print();
        }

        // execute program
        ev.execute(&#34;Streaming WordCount&#34;);
    }

    &#64;Test
    public void demo2() {

    }
}</code></pre>
<pre class="blockcode"><code class="language-java">public class WordCountData {
    public static final String[] WORDS &#61; new String[]{
            &#34;To be, or not to be,--that is the question:--&#34;,
            &#34;Whether &#39;tis nobler in the mind to suffer&#34;,
            &#34;The slings and arrows of outrageous fortune&#34;,
            &#34;Or to take arms against a sea of troubles,&#34;,
            &#34;And by opposing end them?--To die,--to sleep,--&#34;,
            &#34;No more; and by a sleep to say we end&#34;,
            &#34;The heartache, and the thousand natural shocks&#34;,
            &#34;That flesh is heir to,--&#39;tis a consummation&#34;,
            &#34;Devoutly to be wish&#39;d. To die,--to sleep;--&#34;,
            &#34;To sleep! perchance to dream:--ay, there&#39;s the rub;&#34;,
            &#34;For in that sleep of death what dreams may come,&#34;,
            &#34;When we have shuffled off this mortal coil,&#34;,
            &#34;Must give us pause: there&#39;s the respect&#34;,
            &#34;That makes calamity of so long life;&#34;,
            &#34;For who would bear the whips and scorns of time,&#34;,
            &#34;The oppressor&#39;s wrong, the proud man&#39;s contumely,&#34;,
            &#34;The pangs of despis&#39;d love, the law&#39;s delay,&#34;,
            &#34;The insolence of office, and the spurns&#34;,
            &#34;That patient merit of the unworthy takes,&#34;,
            &#34;When he himself might his quietus make&#34;,
            &#34;With a bare bodkin? who would these fardels bear,&#34;,
            &#34;To grunt and sweat under a weary life,&#34;,
            &#34;But that the dread of something after death,--&#34;,
            &#34;The undiscover&#39;d country, from whose bourn&#34;,
            &#34;No traveller returns,--puzzles the will,&#34;,
            &#34;And makes us rather bear those ills we have&#34;,
            &#34;Than fly to others that we know not of?&#34;,
            &#34;Thus conscience does make cowards of us all;&#34;,
            &#34;And thus the native hue of resolution&#34;,
            &#34;Is sicklied o&#39;er with th
分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:3875789
帖子:775174
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP