什么是 Rbd-Gateway???

Posted by 黄润豪 on Monday, January 28, 2019

TOC

本文将会介绍 Rbd-Gateway 的架构设计以及原理分析, 希望可以帮助读者更快地了解 Rbd-Gateway.

什么是 Rbd-Gateway?

Rbd-Gateway(应用网关)是以应用为核心抽象的关键环节, 网关一词大家不会陌生, 不管是传统的流量网关还是 API 网关, 都是业务服务的外层屏障. Rbd-Gateway 基于 Openresty 进行功能扩展, 其核心功能是应用访问的负载路由安全控制. 具体功能如下:

  • HTTP/HTTPs 七层负载均衡
  • TCP/UDP 四层负载均衡
  • 泛域名访问策略
  • 多种负载均衡算法
  • 高级路由(A/B 测试, 灰度发布)

Ingress controllers

在介绍 Ingress controllers 前, 首先需要介绍 Ingress. Kubernetes 官方对 Ingress 的介绍是: 在 Kubernetes v1.1 中添加的 Ingress, 暴露了从集群外部到集群内服务的 HTTP 和 HTTPs 路由. 流量路由由 Ingress 定义的策略控制.

以下是 Kubernetes 官网的原话: Ingress, added in Kubernetes v1.1, exposes HTTP and HTTPS routes from outside the cluster to services within the cluster. Traffic routing is controlled by rules defined on the ingress resource.

Ingress 只是定义了访问的策略, 并没有提供具体的实现. 而 Ingress controllers 的作用就是实现 Ingress 定义的策略. Ingress controllers 只是一个统称, 网上有许多它的实现, 比如:

设计思路

Rbd-Gateway 很多的实现都参考了 NGINX Ingress Controller, Nginx Ingress Controller 是 Kubernetes 官方开源的一款 Ingress Controller, 它支持功能非常的多, 非常强大, 用户数量比较多, 并且非常地活跃(频繁地提交代码).

既然 Nginx Ingress Controller 的优点这么多, 那么为什么我们不直接使用它, 而是花大量的时间和人力成本去实现 Rbd-Gateway 呢? 这里面考虑的因素会比较多, 下面给大家一一道来:

首先, 在时间成本上, 深入去学习 Nginx Ingress Controller 的时间成本不会比实现 Rbd-Gateway 的时间成本低.

然后, 在迭代方向上, Nginx Ingress Controller 的迭代是跟着社区走的, 不是跟着 Rainbond 走的, 虽然它的功能既非富又强大, 但它无法完全满足 Rainbond 的需求, 也有很多功能是 Rainbond 不需要的.

另外, Rbd-Gateway 不会是一个完完全全的 Ingress Controller. 之所以把它命名为Rbd-Gateway, 是因为它的主要功能还是作为一个网关, 当它脱离了 k8s, 脱离了 Ingress 时, 还是能正常地工作的. Rbd-Gateway 除了对接 k8s 进行服务外, 还可以对接 ETCD, ZooKeeper, Eureka等服务注册发现中心. 因此, Rbd-gateway 可以轻松且无侵入地对接 Java 中用的比较多的 Spring Cloud 和 Dubbo.

目前, Rbd-Gateway 还没有实现对 Zookeeper, Eureka 的服务发现.

架构设计

Rbd-Gateway 利用 Kubernetes Informers 或者 ETCD 的事件驱动, 在启动时, 会将所有感兴趣的资源(Ingress, Service, Endpoint, Secret, ConfigMap)缓存在 Store 中; 当 k8s 或 ETCD 中的资源发生变化时, 会主动地将变化的资源 Push 给 Rbd-Gateway, 更新 Store 中的内容, 从而保证数据的一致性. 在 k8s 或 ETCD 将变化的资源 Push 到 Stroe 的同时, 还会触发一些回调函数, Rbd-Gateway 会在这些回调函数中操作 Nginx 的配置, 使用 Ingress 中定义的策略生效.

原理分析

与 Kubernetes 的交互

与 Kubernetes 的交互是利用 Informers 来实现的. 通过 informers 包, 可以轻松地把 k8s 的资源保存在本地缓存 store 中, Rbd-Gateway 在 store 中 List/Get 资源, 不直接与 k8s 交互, 从而减轻 k8s 的压力; 通过 informers 包, 还可以轻松地添加 k8s 资源变化时触发的回调函数. 看一下下面的代码:

初始化 informer factory

// create informers factory, enable and assign required informers
infFactory := informers.NewFilteredSharedInformerFactory(client, conf.ResyncPeriod, corev1.NamespaceAll,
	func(options *metav1.ListOptions) {
		options.LabelSelector = "creater=Rainbond"
	})

这个方法返回一个 SharedInformerFactory 类型的结构体. 第一个参数 client, 是用于与 Kubernetes API 连接和交互的客户端. 第二个参数 defaultResync, 表示 store 与 Kubernetes 同步数据的频率, 这可以保证 store 中数据的可靠性. 第三个参数 namespace, 表示 informers 将监听的 namespace, corev1.NamespaceAll 表示监听所有的 namespace. 第四个参数 tweakListOptions, 用于设置查询选项 ListOptions, 这里设置了只查询包含标签”creater=Rainbond”的资源.

store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()
	store.listers.Ingress.Store = store.informers.Ingress.GetStore()

这两行代码创建了一个 Ingress Informer, 和 Ingress Store.

回调函数

ingEventHandler := cache.ResourceEventHandlerFuncs{
    AddFunc: ...
    DeleteFunc: ...
    UpdateFunc: func(old, cur interface{}) {
        oldIng := old.(*extensions.Ingress)
        curIng := cur.(*extensions.Ingress)
        // ignore the same secret as the old one
        if oldIng.ResourceVersion == curIng.ResourceVersion || reflect.DeepEqual(oldIng, curIng) {
            return
        }
        logrus.Debugf("Received ingress: %v", curIng)

        store.extractAnnotations(curIng)
        store.secretIngressMap.update(curIng)
        store.syncSecrets(curIng)

        updateCh.In() <- Event{
            Type: UpdateEvent,
            Obj:  cur,
        }
    },
}
store.informers.Ingress.AddEventHandler(ingEventHandler)

ResourceEventHandlerFuncs 是一个适配器, 可以在里面写 Add, Update, Delete 对应的回调函数. 这里重点介绍 UpdateFunc, 它有两个参数 old 和 new, 分别代表旧 Ingress 和新 Ingress. 由于 informer 会每隔一个时间间隔就会与 k8s 同步一次资源, 其中有一些或全部的资源根本没有发生变化, 所心需要判断新旧两个 Ingress 有没有变化; 如果有变化, 才解析新 Ingress 中的 annotations 和 secret, 才将新 Ingress 压入环形队列.

因为代码太多了, 所心没有贴出 Service, Secret, Endpoint, ConfigMap 相应的代码. 完整的代码: store.go

将 Kubernetes 资源转化成 Openresty 配置

静态配置

首先, 将 Kubernetes 资源模型转换成 Nginx 模型(model). 然后, 利用 text/template 这个包, 将 Nginx 模型转换成 Nginx 配置. 最后, 执行 nginx -c nginx.conf -s reload 使配置生效.

以 Nginx upstream 为例:

1.将 Endpoint 模型转换成 Upstream 模型.

2.将 Upstream 模型转换成 upstream.conf

使用 text/template, 需要一个模板文件 upstream.tmpl, 如下:

{{ range $upstream := . }}
upstream {{$upstream.Name}} {
    {{ if $upstream.UseLeastConn }} least_conn; {{ end }}
    {{ range $server := $upstream.Servers }}
    {{ $params := $server.Params }}
    server {{ $server.Address }}
      {{if gt $params.Weight 0}}weight={{$params.Weight}}{{end}}
      {{if gt $params.MaxFails 0}}max_fails={{$params.MaxFails}}{{end}}
      {{if $params.FailTimeout }}fail_timeout={{$params.FailTimeout}}{{end}}
      ;
    {{ end }}
}
{{ end }}

将 Upstream 模型转换成 upstream.conf 的工作由 Persist 方法实现, 以下是 Persist 方法的片段:

tpl, err := NewTemplate(tmplName)

参数 tmplName, 代表模板文件的路径, 也就是 upstream.tmpl 的路径. NewTemplate 方法读取模板文件的内容, 解析成 tpl(text/template).

rt, err := tpl.Write(dat)

将 dat 应用在 tpl 上, 得到 rt. (i.e. dat 是 Upstream 模型, rt 是二进制形式数组形式的 upstream.conf.)

if e := ioutil.WriteFile(p+"/"+f, rt, 0666); e != nil {
    return e
}

利用 ioutil.WriteFile 方法, 将 rt 持久化在磁盘上. 这就完成了从 Upstream 模型到 upstream.conf 的转化. 完整代码: template.go

3.reload nginx 配置文件

下面是 reload nginx 配置文件的代码片段:

// check nginx configuration
if out, err := nginxExecCommand("-t").CombinedOutput(); err != nil {
    return fmt.Errorf("%v\n%v", err, string(out))
}

// reload nginx
if out, err := nginxExecCommand("-s", "reload").CombinedOutput(); err != nil {
    return fmt.Errorf("%v\n%v", err, string(out))
}

这段代码的做了两件事情: 1) 执行 nginx -c nginx.conf -t 校验 nginx 配置文件; 2) 执行 nginx -c nginx.conf -s reload reload nginx 的配置文件, 使修改生效.

动态配置

Rbd-Gateway 利用 Openresty 的共享内存, 保存应用的 endpoints 信息, 并利用 lua-resty-balancer 实现各种负载均衡算法. 详细介绍如下:

如图所示, 动态配置工作的阶段有两个, 分别是: init worker 和 balancer.

Rbd-Gateway 会在 Openresty 中开辟一块共享内存(lua_shared_dict):

lua_shared_dict configuration_data {{$h.UpstreamsDict.Num}}{{$h.UpstreamsDict.Unit}};

对外暴露一个用于更新和获取共享内存中的 endpionts 的 API:

location /config {
    access_log off;

    allow 127.0.0.1;
    deny all;

    # this should be equals to configuration_data dict
    client_max_body_size                    {{$h.UpstreamsDict.Num}}{{$h.UpstreamsDict.Unit}};
    proxy_buffering                         off;

    content_by_lua_block {
        config.call()
    }
}

再看 config.call() 中的内容:

function _M.call()
  ...
  if ngx.var.request_method == "GET" then
    ngx.status = ngx.HTTP_OK
    ngx.print(_M.get_backends_data())
    return
  end

  local backends = fetch_request_body()
  if not backends then
    ngx.log(ngx.ERR, "dynamic-configuration: unable to read valid request body")
    ngx.status = ngx.HTTP_BAD_REQUEST
    return
  end

  local success, err = configuration_data:set("backends", backends)
  if not success then
    ngx.log(ngx.ERR, "dynamic-configuration: error updating configuration: " .. tostring(err))
    ngx.status = ngx.HTTP_BAD_REQUEST
    return
  end

  ngx.log(ngx.INFO, "successfully read or update backend data ")
  ngx.status = ngx.HTTP_CREATED
end

当 HTTP 方法为 Get 时, 代表获取 endpoints; 若没有获取到任何的 endpoints, 说明 RBD-Gateway 处于不健康的状态; 当 HTTP 方法为 Post 时, 代表更新 endpoints, Rbd-Gateway 会利用这个 API 把 k8s 中的 Endpoints 更新到共享内存中.

另外, 定时器(Timer) 每隔1秒就会将共享内存中的 endpoints 同步到负载均衡算法(balancer)中, 负载均衡算法包括轮循(round robin), 一致性哈希(chash), 粘性会话(sticky).

function _M.init_worker()
  sync_backends() -- when worker starts, sync backends without delay
  local _, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
  if err then
    ngx.log(ngx.ERR, string.format("error when setting up timer.every for sync_backends: %s", tostring(err)))
  end
end

-- sync_backends sync all backend every 1 seconds
local function sync_backends()
  local backends_data = config.get_backends_data()
  ...

  local ok, new_backends = pcall(json.decode, backends_data)
  ...

  local balancers_to_keep = {}
  for _, new_backend in ipairs(new_backends) do
    sync_backend(new_backend)
    balancers_to_keep[new_backend.name] = balancers[new_backend.name]
  end

  for backend_name, _ in pairs(balancers) do
    if not balancers_to_keep[backend_name] then
      balancers[backend_name] = nil
    end
  end
end

在 balancer 阶段, 根据配置选择相应的负载均衡算法, 执行负载均衡算法中的 balancer 方法(balancer.balance()), 实现对 endpoints 的负载均衡:

balancer_by_lua_block {
    balancer.balance()
}

function _M.balance()
  local balancer = get_balancer()
  if not balancer then
    return
  end

  local peer = balancer:balance()
  if not peer then
    ngx.log(ngx.WARN, "no peer was returned, balancer: " .. balancer.name)
    return
  end

  ngx_balancer.set_more_tries(1)

  local ok, err = ngx_balancer.set_current_peer(peer)
  if not ok then
    ngx.log(ngx.ERR, string.format("error while setting current upstream peer %s: %s", peer, err))
  end
  ngx.log(ngx.INFO, string.format("successfully set current upstream peer %s: %s", peer, err))
end

动态配置的好处是显而易见的: 无需 reload nginx 的配置文件, 即可使应用的水平伸缩(Pod 的数量的变化)生效.

高级路由

只有 HTTP 或 HTTPs 策略才支持高级路由, TCP/UDP 策略不支持高级路由. 在应用网关中的高级路由是指, 通过设置策略中的 path, cookie, header 和 weight, 让同一个域名可以访问不同的应用.

高级路由主要是为 A/B 测试(A/B testing) 和 灰度发布(金丝雀部署, canary deployments) 服务.

以 A/B 测试为例:

A/B 测试的本质是一个实验, 它将应用的两个或多个版本(变体)随机地显示给用户, 通过统计分析确认能够在给定指标中胜出的版本(变体).

请看下面 nginx 配置的 lua 代码(access_by_lua_block):

server {
    listen    80;
    server_name    www.test.com;

	location / {
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        
        set $target 'default';
        access_by_lua_block {
            ngx.var.target = "www_test_com_slash_default"

			local version = ngx.var.http_version
			if version then
                if version == "B" then
                    ngx.var.target = "www_test_com_slash_header"
                else
                    ngx.exit(404)
                end
			end
		}
        proxy_pass http://upstream_balancer;
    }
}

这段代码应用了两条域名相同, Header不同的访问策略. ngx.var.target 设置了一个叫 target 的变量, 它代表的是反向代理中, 上游服务的名称.

ngx.var.target = "www_test_com_slash_default"

这行代码对应的是没有 Header 的访问策略, 它会去寻找默认的上游服务 www_test_com_slash_default.

local version = ngx.var.http_version
if version then
	if version == "B" then
		ngx.var.target = "www_test_com_slash_header"
	else
		ngx.exit(404)
	end
end

如果请求中设置了 Header(version: B), 那么 默认的上游服务就会被覆盖成 www_test_com_slash_header.

使用上面的代码, 可以让用户通过同一个域名(www.test.com)访问不同版本的应用. 然后对数据进行统计分析, 找出在给定指标下的胜者, 从而实现 A/B 测试.

总结

本文从功能介绍, 设计思想, 架构设计, 原理分析等多方面对 Rbd-Gateway 进行了详细的介绍, 希望能对你有所帮助.

「真诚赞赏,手留余香」

AbeWang's Blog

真诚赞赏,手留余香

使用微信扫描二维码完成支付


comments powered by Disqus