• Twitter 连接器
    • 认证
    • 获取认证信息
    • 使用

    Twitter 连接器

    Twitter Streaming API 提供了访问 Twitter 的 tweets 流的能力。Flink Streaming 通过一个内置的 TwitterSource 类来创建到 tweets 流的连接。使用 Twitter 连接器,需要在工程中添加下面的依赖:

    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-connector-twitter_2.11</artifactId>
    4. <version>1.9.0</version>
    5. </dependency>

    注意:当前的二进制发行版还没有这些连接器。集群执行请参考这里.

    认证

    使用 Twitter 流,用户需要先注册自己的程序,获取认证相关的必要信息。过程如下:

    获取认证信息

    首先,需要一个 Twitter 账号。可以通过 twitter.com/signup 免费注册,或者在 Twitter 的 Application Management 登录,然后点击 “Create New App” 按钮来注册应用,填写应用程序相关表格并且接受条款。选择应用程序之后,可以在 “API Keys” 标签页看到 API key 和 API secret(对应于TwitterSource中的twitter-source.consumerKeytwitter-source.consumerSecret )。请保管好这些信息并且不要将其发布到public的仓库。

    使用

    和其他的连接器不同的是,TwitterSource 没有任何其他依赖。下面的示例代码就可以优雅的运行:

    1. Properties props = new Properties();
    2. props.setProperty(TwitterSource.CONSUMER_KEY, "");
    3. props.setProperty(TwitterSource.CONSUMER_SECRET, "");
    4. props.setProperty(TwitterSource.TOKEN, "");
    5. props.setProperty(TwitterSource.TOKEN_SECRET, "");
    6. DataStream<String> streamSource = env.addSource(new TwitterSource(props));
    1. val props = new Properties()
    2. props.setProperty(TwitterSource.CONSUMER_KEY, "")
    3. props.setProperty(TwitterSource.CONSUMER_SECRET, "")
    4. props.setProperty(TwitterSource.TOKEN, "")
    5. props.setProperty(TwitterSource.TOKEN_SECRET, "")
    6. val streamSource = env.addSource(new TwitterSource(props))

    TwitterSource 会发出包含了JSON object的字符串,这样的字符串表示一个Tweet.

    flink-examples-streaming 中的 TwitterExample 类是使用 TwitterSource 的完整示范。

    TwitterSource 默认使用 StatusesSampleEndpointStatusesSampleEndpoint 会返回一个 Tweets 的随机抽样。用户可以通过实现 TwitterSource.EndpointInitializer 接口来自定义 endpoint。