Start Pipeline

Start a specific stream processing pipeline.

Prerequisites

The stream processing pipeline is released online, and the pipeline ID is available.

Request Format

POST https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start

Request Parameters (URI)

Name Location (Path/Query) Mandatory/Optional Data Type Description
pipelineId Path Mandatory String The stream processing pipeline ID, which can be found on the EnOS Management Console > Stream Processing > Stream Operation page.
orgId Query Mandatory String The organization ID. How to get the orgId>>

Request Parameters (Body)

Name Mandatory/Optional Data Type Description
executionMode Mandatory Integer Specify the running mode of the stream processing pipeline (0: Standalone Mode; 1: Cluster Mode).
kafkaRate Mandatory Integer Specify the Kafka data read rate, by the unit of record/s. Default is 1000.
resourceConfig Optional JSONObject Specify the resources for running the pipeline. For Standalone Mode, specify the amount of CUs required by the pipeline directly (for example, 2). For Cluster Mode, specify the resourses in JSON format. For details, see Resource Config

Resource Config

Name Mandatory/Optional Data Type Description
server Optional String Specify the Server resource for running the pipeline, for example 1 (with cu as the unit by default).
yarn Optional JSONObject

Specify the Yarn resource for running the pipeline. The JSON body contains the following parameters:

  • workerCount: Count of workers, for example: 2.
  • master:Count of master workers, for example: 1 (with cu as the unit by default).
  • slave:Count of slave workers, for example: 2 (with cu as the unit by default).
advanced Optional JSONObject Specify the advanced parameters for running the pipeline by Cluster Mode.

Response Parameters

Name Data Type Description
data String Returns an empty string upon success.

Error Code

Code Error Information Description
61100 The Stream configuration JSON is invalid. The pipeline configuration JSON is not valid. Please check the syntax.
61108 stream processing pipeline does not exit. Stream processing pipeline does not exist. Please check the pipeline ID.

Sample

Request Sample

For Standalone Mode

url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start

method: POST

requestBody:
{
    "executionMode": 0,
    "kafkaRate": 1000,
    "resourceConfig": {
        "server": 1
    }
}

For Cluster Mode

url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=start

method: POST

requestBody:
{
    "executionMode": 1,
    "kafkaRate": 1000,
    "resourceConfig": {
        "server": 1,
        "yarn": {
            "workerCount": 2,
            "master": 1,
            "slave": 2
        },
        "advanced": [
            {
                "key": "spark.streaming.backpressure.enabled",
                "value": "true"
            },
            {
                "key": "spark.driver.memoryOverhead",
                "value": "1000"
            },
            {
                "key": "spark.executor.memoryOverhead",
                "value": "800"
            },
            {
                "key": "spark.memory.fraction",
                "value": "0.8"
            },
            {
                "key": "spark.memory.storageFraction",
                "value": "0.7"
            }
        ]
    }
}

Return Sample

{
  "code": 0,
  "msg": "OK",
}

Java SDK Sample

import com.alibaba.fastjson.JSONObject;
import com.envision.apim.poseidon.config.PConfig;
import com.envision.apim.poseidon.core.Poseidon;
import com.envision.apim.poseidon.request.PoseidonRequest;
import org.junit.Before;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

public class Sample {
    private static final String API_Gateway_URL = "https://{domain_url}";
    private Poseidon poseidon;

    private static class Request extends PoseidonRequest {

        public void setBodyParams(String key, Object value) {
            bodyParams().put(key, value);
        }

        public void setMethod(String method) {
            this.method = method;
        }

        private String method;

        @Override
        public String baseUri() {
            return "";
        }

        @Override
        public String method() {
            return method;
        }
    }

    @Before
    public void init() {
        poseidon = Poseidon.config(
                PConfig.init()
                        .appKey("AccessKey of your APP")
                        .appSecret("SecretKey of your APP")
        ).method("POST");
    }

    @Test
    public void StartPipeline() {
        Request request = new Request();
        request.setBodyParams("executionMode", "0");
        request.setBodyParams("kafkaRate", "1000");
        Map<String, String> resourceConfig=new HashMap<>();
        resourceConfig.put("server","1");

        JSONObject response = poseidon
                .url(API_Gateway_URL + "/streaming/v2.0/streaming/pipeline/{pipelineId}")
                .queryParam("orgId", "yourOrgId")
                .queryParam("action", "start")
                .getResponse(request, JSONObject.class);
        System.out.println(response);
    }
}