入门指引:订阅时序数据


时序数据指资产上送或者计算产生的带时间戳的测点数据,包括资产实时数据。本文介绍如何快速使用EnOS数据订阅配置界面进行数据订阅任务配置,并使用数据订阅 SDK(Java或Python)消费订阅数据。

前提条件

  • 已被授权访问数据订阅模块
  • 已接入设备并且设备已经在发送数据
  • 订阅数据的服务账号(SA)已被授权获取资产数据
  • 已安装好IDE(Java或Python)的电脑

目标及数据准备

目标

实现的场景:订阅AI原始数据采集点test_raw的数据。


数据准备

  • 模型配置:使用的模型(test_Model)配置如下:
功能类型 名称 标识符 测点类型 数据类型
测点 test_raw test_raw AI DOUBLE
  • 数据接入:请参考 设备连接 来完成设备连接,采集 test_raw 测点的数据。

操作步骤

订阅并消费资产实时数据的步骤如下:

  • 创建并配置实时数据订阅任务
  • 保存并启动数据订阅任务
  • 使用订阅SDK消费订阅数据
  • 查看数据消费结果

第一步:创建并配置实时数据订阅任务

  • EnOS Cloud:进入 EnOS 管理控制台的 数据订阅 模块,可查看当前 Cloud 组织创建的所有订阅任务。
  • EnOS Edge:进入 EnOS 管理控制台的 Edge 管理 模块,点击进入Edge 详情 > 数据订阅,可查看当前 Edge 设备创建的所有订阅任务。


请参考如下步骤完成告警数据订阅任务的创建及配置:

  1. 创建订阅:点击 添加订阅 按钮,可添加订阅任务,本教程中选择 时序数据订阅

  2. 选择系统自动生成订阅ID。

  3. 选择订阅关联的服务账号 (SA) :每个订阅实例都必须关联一个服务账号 (SA) ,目前EnOS平台上只有创建应用才能生成一个服务账号 (SA) ,具体SA生成方法,请参考 管理应用。EnOS Edge 的服务账号 (SA) 显示为已在 EnOS Edge 上线的应用对应账号。

    注解

    数据订阅关联的服务账号 (SA) 必须已被授权获取资产的数据,否则订阅任务会认证失败,而不能成功订阅数据。授权服务账号 (SA) 的详细信息,请参考 管理服务账号

  4. 选择订阅数据的消息通道,本教程中选择 实时通道

  5. 输入对订阅任务的描述。

  6. 选择需要订阅的客户(仅 EnOS Cloud):每个SA可访问多个客户的数据(通过应用购买建立授权),订阅可根据需要进行客户选择。

  7. 选择模型点过滤条件:本教程中选择 test_Modeltest_raw 测点,订阅系统则会过滤出符合模型测点条件的数据。

第二步:保存并启动订阅任务

配置好订阅任务之后,点击 保存,返回至订阅任务列表页面,启动创建的订阅任务,开始数据的生产。

第三步:线下使用订阅 SDK 消费订阅数据

EnOS数据订阅服务提供对应的数据订阅SDK,帮助你快速进行线下应用开发并消费订阅数据。以下示例以Java SDK为例,介绍线下应用开发的步骤:

EnOS Cloud 消费示例

  • 在IDE中引用 EnOS Cloud 对应的 Maven 地址。IDE中Maven引用示例如下:

    <dependency>
      <groupId>com.envisioniot</groupId>
      <artifactId>subscription-client</artifactId>
      <version>5.0.1</version>
    </dependency>
    <dependency>
      <groupId>com.envisioniot</groupId>
      <artifactId>enos-subscribe-impl</artifactId>
      <version>5.0.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>2.8.0</version>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.16</version>
    </dependency>
    
  • 使用IDE进行线下开发;本教程示例代码如下:

    String sub_server_host ="sub_server_host";
    int sub_server_port ="sub_server_port";
    String accessKey ="access_Key";
    String secretKey ="secret_Key";
    String subId = "subscriptionId";
    String consumerGroup = "consumerGroup";   /*可选*/
    
    /*根据订阅类型获取相应的服务*/
    EosClient eosClient = new EosClient(sub_server_host, sub_server_port, accessKey, secretKey);
    IDataService dataService = eosClient.getDataService();
    
    /* 在启动订阅之前需要创建订阅数据处理函数 */
    IDataHandler dataHandler = new IDataHandler(){
        @Override
        public void dataRead(StreamMessage message) {
            System.out.println(message);
        }
    };
    /* 调用subscribe函数创建订阅连接,调用后订阅连接被创建 */
    dataService.subscribe(dataHandler, subId);
    
    /* (可选)同时指定订阅分组,关于订阅分组的概念见订阅SDK参考说明 */
    dataService.subscribe(dataHandler, subId, consumerGroup);
    

EnOS Edge 消费示例

  • 在IDE中引用 EnOS Edge 对应的 Maven 仓库。IDE中Maven引用示例如下:

    <dependency>
      <groupId>com.envisioniot.sub</groupId>
      <artifactId>enos-subscribe-impl</artifactId>
      <version>1.0.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>2.8.0</version>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.16</version>
    </dependency>
    
  • 使用IDE进行线下开发;本教程示例代码如下:

    String sub_server_host ="sub_server_host";
    int sub_server_port ="sub_server_port";
    String accessKey ="access_Key";
    String secretKey ="secret_Key";
    String subId = "subscriptionId";
    String consumerGroup = "consumerGroup";   /*可选*/
    
    /*根据订阅类型获取相应的 service*/
    EosClient eosClient = new EosClient(sub_server_host, sub_server_port, accessKey, secretKey, true);
    IDataService dataService = eosClient.getDataService();
    
    /* 在启动订阅之前需要创建订阅数据处理函数 */
    IDataHandler dataHandler = new IDataHandler(){
        @Override
        public void dataRead(StreamMessage message) {
            System.out.println(message);
        }
    };
    /* 调用subscribe函数创建订阅连接,调用后订阅连接被创建 */
    dataService.subscribe(dataHandler, subId);
    
    /* (可选)同时指定订阅分组,关于订阅分组的概念见订阅SDK参考说明 */
    dataService.subscribe(dataHandler, subId, consumerGroup);
    

注解

在以上示例中,subscriptionId 需要被替换为在EnOS 管理控制台创建的订阅任务的订阅ID; sub_server_hostsub_server_port 需要被替换为EnOS订阅服务的地址和端口号。由于不同的EnOS服务和实例的地址和端口号不同,请联系远景智能项目经理或技术支持获取对应的服务和端口信息。


有关数据订阅SDK的详细信息,请参考 数据订阅SDK参考

第四步:查看数据消费结果

运行订阅消费程序,通过运行日志查看应用是否已消费实时数据。