Pipeline Explained

Easegress offers a rich set of filters tailored for diverse use cases, including validation, request building, and more. At its core, the Pipeline serves as an integrated framework within Easegress, designed specifically to coordinate and sequence filters for processing requests.

  • It supports calling multiple backend services for one input request.
  • It supports conditional forward direction jumping.
  • The HTTPServer receives incoming traffic and routes to one dedicated Pipeline in Easegress according to the HTTP header, path matching, etc.

Details

Sequences executing

  • The basic model of Pipeline execution is a sequence. Filters are executed one by one in the order described by the flow field in Pipeline’s spec.
        Request
           │
┌──────────┼──────────┐
│ Pipeline │          │
│          │          │
│  ┌───────┴───────┐  │
│  │requestAdaptor │  │
│  └───────┬───────┘  │
│          │          │
│          │          │
│  ┌───────▼───────┐  │
│  │     Proxy     │  │
│  └───────┬───────┘  │
│          │          │
└──────────┼──────────┘
           │
           ▼
        Response
$ echo '
name: pipeline-demo
kind: Pipeline
flow:
- filter: requestAdaptor
- filter: proxy
filters:
- name: requestAdaptor
  kind: RequestAdaptor
  header:
    set:
      X-Adapt-Key: goodplan
- name: proxy
  kind: Proxy
  pools:
  - servers:
    - url: http://127.0.0.1:9095
' | egctl create -f -
  • The pipeline-demo above will execute the requestAdaptor filter first, then proxy. So the proxy filter can forward the request with the header X-Adapt-Key which was set by the requestAdaptor.

JumpIf

  • Easegress’ filter returns a string message as the execution result. If it’s empty, that means these filters handle the request without failure. Otherwise, Easegress will use these no-empty result strings as the basis for the JumpIf mechanism.
  • The Pipeline supports JumpIf mechanism[2]. We use it to avoid some chosen filters’ execution when something goes wrong.
            Request
               │
               │
┌──────────────┼─────────────────┐
│ Pipeline     │                 │
│              ▼                 │
│     ┌──────────────────┐       │
│     │    Validator     ├────┐  │
│     └────────┬─────────┘    │  │
│              │              │  │
│              ▼              │  │
│     ┌──────────────────┐    │  │
│     │  RequestAdaptor  │    │  │
│     └────────┬─────────┘    │  │
│              │          Invalid│
│              ▼              │  │
│     ┌──────────────────┐    │  │
│     │     Proxy        │    │  │
│     └────────┬─────────┘    │  │
│              │              │  │
│              ◄──────────────┘  │
└──────────────┼─────────────────┘
               │
               ▼
            Response
$ echo '
name: pipeline-demo
kind: Pipeline

flow:
- filter: validator
  jumpIf: { invalid: END }
- filter: requestAdaptor
- filter: proxy

filters:
- name: validator
  kind: Validator
  headers:
    Content-Type:
      values:
      - application/json
- name: requestAdaptor
  kind: RequestAdaptor
  header:
    set:
      X-Adapt-Key: goodplan
- name: proxy
  kind: Proxy
  pools:
  - servers:
    - url: http://127.0.0.1:9095
' | egctl apply -f -
  • As we can see above, pipeline-demo will jump to the end of pipeline execution when validator’s execution result is invalid.

Built-in Filter END

  • END is a built-in filter, that’s we can use without defining it. From the above example, we already see that END can be used as the target of jumpIf, and we can also use it directly in the flow.
$ echo '
name: pipeline-demo
kind: Pipeline

flow:
- filter: validator
  jumpIf: { invalid: buildFailureResponse }
- filter: requestAdaptor
- filter: proxy
- filter: END
- filter: buildFailureResponse

filters:
- name: validator
  kind: Validator
  headers:
    Content-Type:
      values:
      - application/json
- name: requestAdaptor
  kind: RequestAdaptor
  header:
    set:
      X-Adapt-Key: goodplan
- name: proxy
  kind: Proxy
  pools:
  - servers:
    - url: http://127.0.0.1:9095
- name: responseBuilder
  kind: ResponseBuilder
  template: |
    statusCode: 400
    body: the request is invalid.
' | egctl apply -f -
  • By using the END filter, we are now possible to build a custom failure response.

Alias

  • We have assigned a name to a filter when we define it, but a filter can be used more than once in the flow, in this case, we can assign an alias to each appearance.
$ echo '
name: pipeline-demo
kind: Pipeline

flow:
- filter: validator
  jumpIf:
    invalid: mockRequestForProxy2
- filter: mockRequest
- filter: proxy1
- filter: END
- filter: mockRequest
  alias: mockRequestForProxy2
- filter: proxy2

filters:
- name: validator
  kind: Validator
  headers:
    Content-Type:
      values:
      - application/json
- name: mockRequest
  kind: RequestBuilder
  template: |
    method: GET
    url: /hello
- name: proxy1
  kind: Proxy
  pools:
  - servers:
    - url: http://127.0.0.1:9095
- name: proxy2
  kind: Proxy
  pools:
  - servers:
    - url: http://127.0.0.2:9095
' | egctl apply -f -

Namespace

  • The pipeline can handle multiple requests/responses, requests/responses are grouped by namespaces, and each namespace can have at most one request and one response.
  • We can set the namespace attribute of filters to let the filter know which request/response it should use.
  • RequestBuilder and ResponseBuilder can access all requests and responses, they output the result request/response to the namespace they are assigned.
  • The default namespace is DEFAULT.
echo '
name: pipeline-api
kind: Pipeline

flow:
- filter: copyRequest
  namespace: demo1
- filter: copyRequest
  namespace: demo2
- filter: copyRequest
  namespace: demo3
- filter: proxy-demo1
  namespace: demo1
- filter: proxy-demo2
  namespace: demo2
- filter: proxy-demo3
  namespace: demo3
- filter: buildResponse

filters:
- name: copyRequest
  kind: RequestBuilder
  sourceNamespace: DEFAULT
- name: proxy-demo1
  kind: Proxy
  pools:
  - servers:
    - url: https://demo1
- name: proxy-demo2
  kind: Proxy
  pools:
  - servers:
    - url: https://demo2
- name: proxy-demo3
  kind: Proxy
  pools:
  - servers:
    - url: https://demo3
- name: buildResponse
  kind: ResponseBuilder
  template: |
    statusCode: 200
    body: [{{.responses.demo1.Body}}, {{.response.demo2.Body}}, {{.response.demo3.Body}}]
' | egctl create -f -

Usage

GlobalFilter

GlobalFilter is a special pipeline that can be executed before or/and after all pipelines in a server. For example:

name: globalFilter-example
kind: GlobalFilter
beforePipeline:
  flow:
  - filter: validator

  filters:
  - name: validator
    kind: Validator
    ...

afterPipeline:
  ...

---
name: server-example
kind: HTTPServer
globalFilter: globalFilter-example
...

In this case, all requests in HTTPServer server-example go through GlobalFilter globalFilter-example before and after executing any other pipelines.

Load Balancer

The reverse proxy is the common middleware that is accessed by clients, forwards them to backend servers. The reverse proxy is a very core role played by Easegress.

The filter Proxy is the filter to fire requests to backend servers. It contains servers group under load balance, whose policy support roundRobin, random, weightedRandom, ipHash, headerHash. For more information, please check load balance.

name: pipeline-reverse-proxy
kind: Pipeline
flow:
  - filter: proxy
filters:
  - name: proxy
    kind: Proxy
    pools:
    - servers:
      - url: http://127.0.0.1:9095
      - url: http://127.0.0.1:9096
      - url: http://127.0.0.1:9097
      loadBalance:
        policy: roundRobin

Traffic Adaptor: Change Something of Two-Way Traffic

Sometimes backend applications can’t adapt to quick changes of requirements of traffic. Easegress could be an adaptor between new traffic and old applications. There are 2 phases of adaption in reverse proxy: request adaption, response adaption. RequestAdaptor supports the adaption of a method, path, header, and body. ResponseAdaptor supports the adaption of header and body. As you can see, the flow in spec plays a critical role.

name: pipeline-reverse-proxy
kind: Pipeline
flow:
  - filter: requestAdaptor
  - filter: proxy
  - filter: responseAdaptor
filters:
  - name: requestAdaptor
    kind: RequestAdaptor
    host: easegress.megaease.com
    method: POST
    path:
      trimPrefix: /apis/v2
    header:
      set:
        X-Api-Version: v2

  - name: responseAdaptor
    kind: ResponseAdaptor
    header:
      set:
        Server: Easegress v1.0.0
      add:
        X-Easegress-Pipeline: pipeline-reverse-proxy

  - name: proxy
    kind: Proxy
    pools:
    - servers:
      - url: http://127.0.0.1:9095
      - url: http://127.0.0.1:9096
      - url: http://127.0.0.1:9097
      loadBalance:
        policy: roundRobin

API Aggregation

API aggregation is a pattern to aggregate multiple individual requests into a single request. This pattern is useful when a client must make multiple calls to different backend systems to operate. Easegress provides filters RequestBuilder & ResponseBuilder for this powerful feature.

Example 1: Simple aggregation

  1. Suppose we have three backend services called demo1, demo2, and demo3. We want to call these three services and combine their response to one. demo1 returns {"mega":"ease"} as the HTTP response body, demo2 returns {"hello":"world"}, and demo3 returns {"hello":"new world"}.
echo '
name: pipeline-api
kind: Pipeline
flow:
- filter: copyRequest
  namespace: demo1
- filter: copyRequest
  namespace: demo2
- filter: copyRequest
  namespace: demo3
- filter: proxy-demo1
  namespace: demo1
- filter: proxy-demo2
  namespace: demo2
- filter: proxy-demo3
  namespace: demo3
- filter: buildResponse

filters:
- name: copyRequest
  kind: RequestBuilder
  sourceNamespace: DEFAULT
- name: proxy-demo1
  kind: Proxy
  pools:
  - servers:
    - url: https://demo1
- name: proxy-demo2
  kind: Proxy
  pools:
  - servers:
    - url: https://demo2
- name: proxy-demo3
  kind: Proxy
  pools:
  - servers:
    - url: https://demo3
- name: buildResponse
  kind: ResponseBuilder
  template: |
    statusCode: 200
    body: |
      [{{.responses.demo1.Body}}, {{.responses.demo2.Body}}, {{.responses.demo3.Body}}]
' | egctl create -f -
  1. Creating an HTTPServer for forwarding the traffic to this pipeline.
echo '
kind: HTTPServer
name: server-demo
port: 10080
keepAlive: true
https: false
rules:
  - paths:
    - pathPrefix: /api
      backend: pipeline-api ' | egctl create -f -
  1. Send a request to the pipeline

$ curl  -X GET  http://127.0.0.1:10080/api -v
[{"hello":"world"},{"hello":"new world"},{"mega":"ease"}]

Example 2: Merge response body

  • In #Example 1, demo2 and demo3’s responses share the same JSON key, we want to merge their response body by the JSON key together. If there are duplicated keys, we will use the last value.
  1. Update the pipeline spec
echo '
name: pipeline-api
kind: Pipeline
flow:
- filter: copyRequest
  namespace: demo1
- filter: copyRequest
  namespace: demo2
- filter: copyRequest
  namespace: demo3
- filter: proxy-demo1
  namespace: demo1
- filter: proxy-demo2
  namespace: demo2
- filter: proxy-demo3
  namespace: demo3
- filter: buildResponse

filters:
- name: copyRequest
  kind: RequestBuilder
  sourceNamespace: DEFAULT
- name: proxy-demo1
  kind: Proxy
  pools:
  - servers:
    - url: https://demo1
- name: proxy-demo2
  kind: Proxy
  pools:
  - servers:
    - url: https://demo2
- name: proxy-demo3
  kind: Proxy
  pools:
  - servers:
    - url: https://demo3
- name: buildResponse
  kind: ResponseBuilder
  template: |
    statusCode: 200
    body: |
      {{mergeObject .responses.demo1.JSONBody .responses.demo2.JSONBody .responses.demo3.JSONBody | toRawJson}}
' | egctl apply -f -
  1. Send a request to the pipeline

$ curl  -X GET  http://127.0.0.1:10080/api -v
{"hello":"new world","mega":"ease"}

Example 3: Handle Failures

In #Example 1, if one of the backend services encounters a failure, the pipeline will result a wrong result. We can improve the pipeline to handle this kind of failures.

  1. Update the pipeline spec
echo '
name: pipeline-api
kind: Pipeline
flow:
- filter: copyRequest
  namespace: demo1
- filter: copyRequest
  namespace: demo2
- filter: copyRequest
  namespace: demo3
- filter: proxy-demo1
  namespace: demo1
  jumpIf: { serverError: proxy-demo2 }
- filter: proxy-demo2
  namespace: demo2
- filter: proxy-demo3
  namespace: demo3
- filter: buildResponse

filters:
- name: copyRequest
  kind: RequestBuilder
  sourceNamespace: DEFAULT
- name: proxy-demo1
  kind: Proxy
  pools:
  - servers:
    - url: https://demo1
- name: proxy-demo2
  kind: Proxy
  pools:
  - servers:
    - url: https://demo2
- name: proxy-demo3
  kind: Proxy
  pools:
  - servers:
    - url: https://demo3
- name: buildResponse
  kind: ResponseBuilder
  template: |
    statusCode: {{$code := 503}}{{range $resp := .responses}}{{if eq $resp.StatusCode 200}}{{$code = 200}}{{break}}{{end}}{{end}}{{$code}}
    body: |
      {{$first := true -}}
      [{{range $resp := .responses -}}
        {{if eq $resp.StatusCode 200 -}}
          {{if not $first}},{{end}}{{$resp.Body}}{{$first = false -}}
        {{- end}}
      {{- end}}]
' | egctl apply -f -
  1. Stop service demo1 and send a request to the pipeline
$ curl  -X GET  http://127.0.0.1:10080/api -v
[{"hello":"world"},{"hello":"new world"}]

References

  1. https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern
  2. Jumpif mechanism in pipeline