发布和丰富实时金融数据流,使用 Amazon MSK 和 Amazon Managed Servic
  • 23

使用 Amazon MSK 和 Amazon 管理的 Apache Flink 发布和丰富实时金融数据

关键要点

实时金融数据流,包括股票报价和商品价格,被广泛用于投资决策。客户日益希望通过 AWS 云直接接收数据流,以实现高效和低延迟的使用体验。本文展示了如何使用 Amazon Managed Streaming for KafkaAmazon MSK和 Amazon Managed Service for Apache Flink 在 AWS 上发布丰富的实时数据流。

金融数据流是实时的股票报价、商品价格、期权交易或其他金融数据流。涉及资本市场的公司,例如对冲基金、投资银行和经纪公司,利用这些数据流来指导投资决策。

金融数据提供商越来越多地被客户要求通过 AWS 云直接交付数据流。这是因为客户已经在 AWS 上建立了基础设施来存储和处理数据,并希望以最低的努力和延迟进行消费。此外,AWS 云的高性价比使得中小型企业也能够成为金融数据提供商。它们可以交付并通过富有价值信息丰富的数据流进行盈利。

富裕的数据流可以结合来自多个来源的数据,包括金融新闻流,以添加信息,如股票拆分、公司并购、成交量警报和移动平均交叉。

在本文中,我们展示了如何使用 Amazon Managed Streaming for KafkaAmazon MSK和 Amazon Managed Service for Apache Flink 在 AWS 上发布富有的数据流。这种架构模式可以用于资本市场行业的各种用例;我们将在本文中讨论一些用例。

Apache Kafka 是一个高吞吐量、低延迟的分布式事件流平台。纳斯达克Nasdaq和纽约证券交易所NYSE等金融交易所正越来越多地转向 Kafka 来传递它们的数据流,因为 Kafka 能够处理高流量和高速度的数据流。

Amazon MSK 是一个完全托管的服务,使您能够轻松地在 AWS 上构建和运行使用 Kafka 处理流数据的应用程序。

Apache Flink 是一个开源的分布式处理引擎,提供强大的编程接口支持流处理和批处理,具有优先支持状态处理、事件时间语义、检查点、快照和回滚。Apache Flink 支持多种编程语言,包括 Java、Python、Scala 和 SQL,支持多种不同层级抽象的 API,这些 API 可以在同一应用程序中交替使用。

Amazon Managed Service for Apache Flink 提供了一个完全托管的无服务器体验,简化了运行 Apache Flink 应用程序的过程。客户可以使用 Flink 的任何语言和 API 容易地构建实时 Flink 应用程序。

在这篇文章中,我们使用来自金融数据提供商 Alpaca 的实时股票报价流,并在价格超过或低于某一阈值时添加指示器。提供的代码在 GitHub 仓库 中,允许您将解决方案部署到您的 AWS 账户。该解决方案由 AWS 合作伙伴 NETSOL Technologies 构建。

解决方案概述

在这个解决方案中,我们部署了一个 Apache Flink 应用程序,它丰富了原始数据流,一个包含原始和丰富流消息的 MSK 集群,以及一个 Amazon OpenSearch 服务 集群,它作为查询数据的持久化数据存储。在一个作为客户 VPC 的单独虚拟私有云VPC中,我们还部署了一个运行 Kafka 客户端的 Amazon EC2 实例,消费丰富的数据流。以下图示说明了此架构。

发布和丰富实时金融数据流,使用 Amazon MSK 和 Amazon Managed Servic

图一 解决方案架构

以下是解决方案的逐步细分:

您的 VPC 中的 EC2 实例运行着一个 Python 应用程序,通过 API 从数据提供商获取股票报价。在这里,我们使用 Alpaca 的 API。应用程序通过 Kafka 客户端库将这些报价发送到您在 MSK 集群上的 Kafka 主题,Kafka 主题存储原始报价。Apache Flink 应用程序获取 Kafka 消息流,并通过在股票价格从前一个交易日收盘价上涨或下跌 5 或以上时添加指示器来丰富流。Apache Flink 应用程序将丰富的数据发送到您在 MSK 集群上的单独 Kafka 主题。Apache Flink 应用程序还使用 Flink 连接器将丰富的数据流发送到 Amazon OpenSearch。Amazon OpenSearch 存储数据,OpenSearch Dashboards 允许应用程序在将来任何时刻查询该数据。您的客户在其 AWS 账户中的单独 VPC 上运行 Kafka 消费者应用程序。该应用程序使用 AWS PrivateLink 安全实时消费丰富的数据流。所有 Kafka 用户名和密码都被加密并存储在 AWS Secrets Manager 中。使用的 SASL/SCRAM 身份验证 协议确保所有进出 MSK 集群的数据在传输中都被加密。默认情况下,Amazon MSK 会加密集群内所有静态数据。

部署过程包括以下高层次步骤:

在生产者 AWS 账户中,启动 Amazon MSK 集群、Apache Flink 应用程序、Amazon OpenSearch 服务域和 Kafka 生产者 EC2 实例。此步骤通常在 45 分钟内完成。设置 多 VPC 连接 和 MSK 集群的 SASL/SCRAM 身份验证。此步骤可能会持续长达 30 分钟。在消费账户中启动 VPC 和 Kafka 消费者 EC2 实例。此步骤约需 10 分钟。

先决条件

要部署此解决方案,请完成以下先决步骤:

如果您尚未拥有 AWS 账户,请创建一个并登录。我们将其称为生产者账户。创建一个具有完全管理员权限的 AWS 身份与访问管理IAM用户。具体说明请参见 创建 IAM 用户。注销并重新使用该 IAM 管理用户登录 AWS 管理控制台。在生产者账户中创建一个名为 myec2keypair 的 EC2 密钥对。如果您已经有 EC2 密钥对,可以跳过此步骤。根据 ALPACAREADME 中的说明,在 Alpaca 上注册一个免费的基本账户,以获取您的 Alpaca API 密钥 和秘密密钥。Alpaca 将为我们的输入数据流提供实时股票报价。在本地开发机器上安装 AWS 命令行界面AWS CLI,并为管理员用户创建一个配置文件。有关详细说明,请参阅 设置 AWS 命令行界面AWS CLI。

全局安装最新版本的 AWS 云开发工具包AWS CDK:

bashnpm install g awscdk@latest

部署 Amazon MSK 集群

以下步骤将在新的提供者 VPC 中创建 Amazon MSK 集群。同样,您将部署 Apache Flink 应用程序并启动一个新的 EC2 实例以运行获取原始股票报价的应用程序。

在开发机器上,克隆 GitHub 仓库并安装 Python 包:

bashgit clone https//githubcom/awssamples/mskpoweredfinancialdatafeedgitcd mskpoweredfinancialdatafeedpip install r requirementstxt

设置以下环境变量,以指定您的生产 AWS 账户编号和 AWS 区域:

bashexport CDKDEFAULTACCOUNT={你的AWS账户号}export CDKDEFAULTREGION=useast1

运行以下命令以创建 configpy 文件:

bashecho mskCrossAccountId = lt你的生产 AWS 账户IDgt gt configpyecho producerEc2KeyPairName = gtgt configpyecho consumerEc2KeyPairName = gtgt configpyecho mskConsumerPwdParamStoreValue= gtgt configpyecho mskClusterArn = gtgt configpy

运行以下命令以创建 alpacaconf 文件:

bashecho [alpaca] gt dataFeedMsk/alpacaconfecho ALPACAAPIKEY=yourapikey gtgt dataFeedMsk/alpacaconfecho ALPACASECRETKEY=yoursecretkey gtgt dataFeedMsk/alpacaconf

编辑 alpacaconf 文件,将 yourapikey 和 yoursecretkey 替换为您的 Alpaca API 密钥。

为生产者账户引导环境:

bashcdk bootstrap aws//{你的AWS账户号}/{你的aws区域}

使用编辑器或集成开发环境IDE编辑 configpy 文件:

更新 mskCrossAccountId 参数,使用您的 AWS 生产账户编号。如果您有现有的 EC2 密钥对,将分配给 producerEc2KeyPairName 参数的值设置为您的密钥对的名称。查看 dataFeedMsk/parameterspy 文件:如果您在不同于 useast1 的区域上部署,请相应更新可用性区 ID az1 和 az2。例如,uswest2 的可用性区将是 uswest2a 和 uswest2b。确保 parameterspy 文件中的 enableSaslScramClientAuth、enableClusterConfig 和 enableClusterPolicy 参数设置为 False。

确保您在 app1py 文件所在的目录中。然后执行部署:

bashcdk deploy all app python app1py profile {你的用户配置文件名}

加速器一小时

检查您现在是否拥有以 awsblogdevartifacts 开头的 Amazon Simple Storage ServiceAmazon S3存储桶,其中包含一些 Python 脚本和 Apache Flink 应用程序的 JAR 文件。

部署多 VPC 连接和 SASL/SCRAM

完成以下步骤,以部署多 VPC 连接和 MSK 集群的 SASL/SCRAM 身份验证:

将 configpy 文件中的 enableSaslScramClientAuth、enableClusterConfig 和 enableClusterPolicy 参数设置为 True。

在与 configpy 文件所在目录中,部署 MSK 集群的多 VPC 连接和 SASL/SCRAM 身份验证:

bashcdk deploy all app python app1py profile {你的用户配置文件名}

此步骤最多可能需 30 分钟。

为检查结果,导航到您的 MSK 集群,选择 属性。

您应该看到 PrivateLink 被打开,身份验证类型为 SASL/SCRAM。

复制 MSK 集群的 ARN。编辑您的 configpy 文件,并将 ARN 输入到 mskClusterArn 参数的值中,然后保存已更新的文件。

部署数据流消费者

完成本节中的步骤,在新的消费者账户中创建一个 EC2 实例以运行 Kafka 消费者应用程序。该应用程序将通过 PrivateLink 和 SASL/SCRAM 连接到 MSK 集群。

在您的生产者账户中,导航到 参数存储,这是 AWS 系统管理器 的一个功能。复制 blogAwsdevmskConsumerPwdssmParamStore 参数的值,并更新 configpy 文件中的 mskConsumerPwdParamStoreValue 参数。检查名为 blogAwsdevgetAzIdsParamStore 的参数的值,并记录这两个值。如果您还没有 Kafka 消费者账户,请创建另一个 AWS 账户并登录。创建一个具有管理员权限的 IAM 用户。注销并使用该 IAM 管理用户重新登录控制台。确保您处于与生产者账户相同的区域。然后在此消费者账户中创建一个新的 EC2 密钥对,命名为例如 myec2consumerkeypair。 使用刚刚创建的密钥对的名称更新 configpy 文件中的 consumerEc2KeyPairName 值。在消费者账户中打开 AWS 资源访问管理 控制台。将来自系统管理器参数存储的可用性区 ID 与 AWS RAM 控制台显示的可用性区 ID 进行比较。确定相应的可用性区名称与匹配的可用性区 ID。在 dataFeedMsk 文件夹中的 parameterspy 文件中插入这些可用性区名称到变量 crossAccountAz1 和 crossAccountAz2 中。例如,如果在参数存储中,值为 “use1az4” 和 “use1az6”,那么,您在消费者账户的 AWS RAM 控制台中可能发现这些值对应于可用性区名称 “useast1a” 和 “useast1b”。在这种情况下,您需要通过将 crossAccountAz1 设置为 “useast1a” 和 crossAccountAz2 设置为 “useast1b” 来更新参数。

设置以下环境变量,指定您的消费者 AWS 账户 ID:

bashexport CDKDEFAULTACCOUNT={你的aws账户id}export CDKDEFAULTREGION=useast1

引导消费者账户环境。您需要为 AWS CDK 角色添加特定的策略。

bashcdk bootstrap aws//{你的aws账户id}/{你的aws区域} cloudformationexecutionpolicies arnawsiamawspolicy/AmazonMSKFullAccessarnawsiamawspolicy/AdministratorAccess profile lt你的用户配置gt

现在,您需要授予消费者账户访问 MSK 集群的权限。

在控制台中,将消费者 AWS 账户编号复制到剪贴板。注销并重新登录您的生产者 AWS 账户。在 Amazon MSK 控制台中,导航到您的 MSK 集群。选择 属性 并滚动到 安全设置。

选择 编辑集群策略,并将消费者账户根添加到 Principal 部分,格式如下,然后保存更改:

jsonPrincipal { AWS [arnawsiamlt生产者账户编号gtroot arnawsiamlt消费者账户编号gtroot]}

创建需要附加到 EC2 消费者实例的 IAM 角色:

bashaws iam createrole rolename awsblogdevappconsumerEc2Role assumerolepolicydocument file//dataFeedMsk/ec2ConsumerPolicyjson profile lt你的用户配置gt

部署消费者账户基础设施,包括 VPC、消费者 EC2 实例、安全组和与 MSK 集群的连接:

bashcdk deploy all app python app2py profile {你的用户配置文件名}

运行应用程序并查看数据

现在,我们已经建立了基础设施,我们可以从生产者 EC2 实例生成原始股票报价流到 MSK 集群,使用 Apache Flink 应用程序进行丰富,并通过 PrivateLink 从消费者应用程序消费丰富的数据流。在此文中,我们使用 Flink DataStream Java API 来处理和丰富股票数据流。我们还使用 Flink 的聚合和窗口功能来识别特定时间窗口内的见解。

运行托管的 Flink 应用程序

完成以下步骤以运行托管的 Flink 应用程序:

1