多厂商容器平台开发系统性总结
总述
自2021年6月21号由玩云原生的运维转玩云原生的开发至今,已有5月有余,除去中间的一些其他工作任务,实际参与(实际是一个人负责开发)多厂商容器平台开发应有3月有余。个人开发并规划的多厂商容器平台是根据此张由我个人设计的规划图进行的(ps:部门没有架构师级别的,能提供可行的架构图或者一点指导,所以只能根据4年的kubernetes使用经验和逛各大网站和大厂的相关文档)。
也算是我心目中的多厂商容器平台的开发思路吧,下面将重点围绕上图展开。
注:此篇为我心目中的"多厂商容器管理平台"。
依赖项
主要
- golang 域名.6
- goland 2021.2.3
- gin 1.7.4
- gorm 域名
- kubernetes 域名 -- 域名
- client-go 域名.0
规划图浅解
采取模块分层开发:以适配层为分界线
1、上层为适配层,根据不同的厂商进行API封装,统一返回字段,需要根据厂商的字段进行不同的方法开发。
2、下层为核心层,直接调kubernetes接口,统一返回字段。
各容器厂商会根据实际需求,对容器平台进行适当的整改,如:
集群的创建方案[1、在创建集群的同时按照参数创建云厂商节点资源 2、导入已有的节点资源创建集群 3、等等等]
节点特性[1、根据地域就近调度 2、控制节点是否可见 3、等等等]
kubernetes附件组建[1、网络插件的按需选择或二开 2、节点的pod最大数 3、等等等]
导入第三方kubernetes集群的方案[1、导入config 2、导入secret 3、导入agent 4、等等等]
......
根据以上的考虑:
1、集群和节点的增删改采取适配器的方案开发
2、集群和节点的查询走kubernetes,其一查询是个频繁的操作而厂家平台是有APi调用次数限制,其二集群和节点的状态应以kubernetes原生为依据而不应该是以厂家的容器平台为依据,其三减少代码的无用重复便于维护
3、每个厂家的仓库(helm仓库和image仓库)都有各自的特性且不属于kubernetes核心资源且无法像kubernetes那样获取到最底层的API文档,所以增删改查都走厂家似乎没有不妥之处。
4、数据库需要维护的有:集群信息、节点信息、仓库的有关权控信息。尽可能的减少此服务的维护复杂度,能交给厂家和kubernetes的etcd维护的最好。
4.1 如没有"集群没创建成功即可查看集群的个别信息和集群没创建成功既可查看节点的有关信息"的需求,那其实集群和节点也没有数据库维护的必要性,我认为。5、kubernetes的核心资源,即可以被
kubectl api-resources
查到的资源,通过kubernetes的api即可。6、因为服务在可预见的时期是部署与虚机上,所以暂没有准备进行kubernetes 聚合api开发,但api最好为声明式的,留下可行性。
7、requests和response采取常规的json格式,没有选择采用yaml的json格式,主要是为了减轻前端的不便,且当下大厂也是各有格式,最主要的是目前还没有进行api聚合到kubernetes。
8、认为开发的重点应该在中期规划部分,这才所有创新,有所不同于众多厂家。
部分核心代码
目录结构:
.
├── Makefile
├── 域名
├── cmd
│ └── container
│ └── 域名
├── config
│ └── 域名
├── deploy
│ ├── docker
│ │ └── Dockerfile
│ └── vmware
├── docs
├── 域名
├── 域名
├── internal
│ └── container
│ ├── bootstrap
│ ├── controller
│ ├── dao
│ ├── dto
│ ├── ecode
│ ├── initialize
│ ├── job
│ ├── middleware
│ ├── models
│ ├── pkg
│ │ ├── kubernetes
│ │ │ ├── dto
│ │ │ └── service
│ │ ├── registry
│ │ │ ├── 域名
│ │ │ └── tke
│ │ │ └── acr
│ │ │ └── harbor
│ │ └── rancher
│ │ ├── dto
│ │ └── service
│ │ └── ack
│ │ ├── dto
│ │ └── service
│ │ └── tke
│ │ ├── dto
│ │ └── service
│ ├── routers
│ └── service
├── pkg
├── scripts
│ ├── 域名
│ └── 域名
└── test
封装httpClient
type HTTPClient interface {
Do(req *域名est) (*域名onse, error)
}
var (
Client HTTPClient
)
func init() {
Client = &域名nt{
Timeout: 10 * 域名nd,
Transport: &域名sport{
TLSClientConfig: &域名ig{InsecureSkipVerify: true},
DisableKeepAlives: true,
Proxy: 域名yFromEnvironment,
DialContext: (&域名er{
Timeout: 30 * 域名nd, // tcp连接超时时间
KeepAlive: 60 * 域名nd, // 保持长连接的时间
DualStack: true,
}).DialContext, // 设置连接的参数
MaxIdleConns: 50, // 最大空闲连接
MaxConnsPerHost: 100,
MaxIdleConnsPerHost: 100, // 每个host保持的空闲连接数
ExpectContinueTimeout: 30 * 域名nd, // 等待服务第一响应的超时时间
IdleConnTimeout: 60 * 域名nd, // 空闲连接的超时时间
},
}
}
// CheckRespStatus 状态检查
func CheckRespStatus(resp *域名onse) ([]byte, error) {
bodyBytes, _ := 域名All(域名)
if 域名usCode >= 200 && 域名usCode < 400 {
return bodyBytes, nil
}
return nil, 域名(string(bodyBytes))
}
// Request 建立http请求
func Request(url, token, body string, headerSet map[string]string, method string) (respStatusCode int, respBytes []byte, err error) {
request, err := 域名equest(method, url, 域名eader(body))
if err != nil {
return 401, nil, err
}
//添加token
if token != "" {
域名("Authorization", "Bearer "+token)
}
// header 添加字段
if headerSet != nil {
for k, v := range headerSet {
域名(k, v)
}
}
resp, err := 域名(request)
if err != nil {
return 401, nil, err
}
defer 域名e()
// 返回的状态码
respBytes, err = CheckRespStatus(resp)
respStatusCode = 域名usCode
return
}
封装clusterManager
type ClusterManager struct {
ClientSet *域名ntset
Metrics *域名ntset
DynamicClient 域名rface
}
const (
//DefaultQPS High enough QPS to fit all expected use cases.
DefaultQPS = 1e6
//DefaultBurst High enough Burst to fit all expected use cases.
DefaultBurst = 1e6
)
func buildConfig(clusterName string) (*域名ig, error) {
var clientConfig *域名ig
var configV1 *域名ig
var dbCluster 域名ter
var err error
var host string
rows := 域名e("cluster_name = ?", clusterName).Find(&dbCluster).RowsAffected
if rows == 0 {
return nil, 域名("the database does not have this information")
}
if 域名ConfigSecret != "" {
kubeConfigBytes, err := 域名deString(域名ConfigSecret)
kubeConfigJson, err := 域名ToJSON(kubeConfigBytes)
err = 域名rshal(kubeConfigJson, &configV1)
if err != nil {
域名r(域名r())
}
// 切换匹配的版本
configObject, err := 域名ertToVersion(configV1, 域名meGroupVersion)
configInternal := configObject.(*域名ig)
// 实例化配置信息
clientConfig, err = 域名efaultClientConfig(*configInternal, &域名igOverrides{}).ClientConfig()
域名 = DefaultQPS
域名t = DefaultBurst
} else if 域名n != "" {
var addresses []域名esses
err := 域名rshal([]byte(域名erver), &addresses)
for _, address := range addresses {
if 域名 == "Real" {
host = 域名ntf("https://") + 域名 + 域名ntf(":") + 域名(域名)
break
}
}
if err != nil {
return nil, 域名("request connection cluster failed")
}
clientConfig = &域名ig{
Host: host,
APIPath: "",
ContentConfig: 域名entConfig{},
Username: "",
Password: "",
BearerToken: 域名n,
BearerTokenFile: "",
Impersonate: 域名rsonationConfig{},
AuthProvider: nil,
AuthConfigPersister: nil,
ExecProvider: nil,
TLSClientConfig: 域名lientConfig{
Insecure: true,
},
UserAgent: "",
DisableCompression: false,
Transport: nil,
WrapTransport: nil,
QPS: DefaultQPS,
Burst: DefaultBurst,
RateLimiter: nil,
WarningHandler: nil,
Timeout: 0,
Dial: nil,
Proxy: nil,
}
} else {
return nil, 域名("build client config error")
}
return clientConfig, nil
}
func BuildApiServerClient(clusterName string) (*ClusterManager, error) {
clientConfig, err := buildConfig(clusterName)
if err != nil {
return nil, err
}
if clientConfig == nil {
return nil, 域名("err: error BuildApiServerClient")
}
clientSet, err := 域名orConfig(clientConfig)
if err != nil {
return nil, err
}
// 这里一定要调用Discovery().ServerVersion(),探测Kube apiServer是否可用,因为域名orConfig(restConfig)不会去检查服务是否可用,当服务不可用时,该方法不会返回错误的
_, err = 域名overy().ServerVersion()
if err != nil {
return nil, err
}
m, err := 域名orConfig(clientConfig)
if err != nil {
return nil, err
}
d, err := 域名orConfig(clientConfig)
if err != nil {
return nil, err
}
clusterManager := &ClusterManager{
clientSet,
m,
d,
}
return clusterManager, nil
}
封装reponseApi
type ApiResponse struct {
Code int `json:"code"`
Msg string `json:"message"`
Data interface{} `json:"data"`
}
// PaginateResponse 显然这个结构体可以复用 ApiResponse, 但是 swagger 不认识!
type PaginateResponse struct {
Code int `json:"code"`
Msg string `json:"message"`
Data Paginate `json:"data"`
}
type Paginate struct {
CurPage int `json:"cur_page"` // 当前页
CurPageSize int `json:"cur_page_size"` // 每页展示数据量
Total int `json:"total"` // 总共数据量
TotalPage int `json:"total_page"` // 总共页数
Data interface{} `json:"data"` // 数据
}
// SuccessResponse API成功返回
func SuccessResponse(c *域名ext, data interface{}) {
response(c, 域名ess, data)
}
// SuccessPaginateResponse 分页返回
func SuccessPaginateResponse(c *域名ext, data interface{}, total int, curPage int, curPageSize int) {
域名(域名usOK, PaginateResponse{
Code: int(域名ess),
Msg: 域名sg[域名ess],
Data: Paginate{CurPage: curPage, CurPageSize: curPageSize, Total: total, TotalPage: int(域名(float64(total) / float64(curPageSize))), Data: data},
})
域名t()
}
// ErrorResponse API失败返回
func ErrorResponse(c *域名ext, code 域名ode, data interface{}) {
response(c, code, data)
}
func NotFoundResponse(c *域名ext) {
域名(域名usNotFound, gin.H{
"code": 404,
"message": "页面未找到",
"data": "",
})
}
func response(c *域名ext, code 域名ode, data interface{}) {
域名(域名usOK, ApiResponse{
Code: int(code),
Msg: 域名sg[code],
Data: data,
})
域名t()
}
几个调kubernetes的例子【认为比较有趣的例子】
# 倒序输出事件
events, err := 域名Events(clientSet, namespace, name)
if err != nil {
return nil, err
}
var eventsWorkloadReps []*域名tsWorkloadRep
t := 域名{}
for i := len(events) - 1; i >= 0; i-- {
if events[i].域名 == t {
events[i].域名 = events[i].域名
}
if events[i].域名 == t {
events[i].域名 = events[i].域名
}
eventsWorkloadReps = append(eventsWorkloadReps, &域名tsWorkloadRep{
WorkloadUUID: id,
FirstTimestamp: events[i].域名,
LastTimestamp: events[i].域名,
Type: events[i].Type,
Kind: events[i].域名,
Name: events[i].Name,
Reason: events[i].Reason,
Message: events[i].Message,
Count: events[i].Count,
})
}
if len(eventsWorkloadReps) == 0 {
return make([]*域名tsWorkloadRep, 0), nil
}
return eventsWorkloadReps, nil
# pod log
limit, _ := 域名eInt(tailLines, 10, 64)
req := 域名V1().Pods(namespace).GetLogs(name, &域名ogOptions{Container: container, Timestamps: true, TailLines: &limit})
podLogs, err := 域名am(域名())
if err != nil {
return "error in opening stream"
}
defer 域名e()
buf := new(域名er)
_, err = 域名(buf, podLogs)
if err != nil {
return "error in copy information from podLogs to buf"
}
str := 域名ng()
return str
# 根据kubeconfig 获取ApiServer\CertFile\Token
decoded, err := 域名deString(kubeConfig)
decodestr := string(decoded)
// 认证方式为kubeConfig
// 通过kubeConfig获取 api / token / certFile
kubeConfigJson, err := 域名ToJSON([]byte(decodestr))
var configV1 *域名ig
err = 域名rshal(kubeConfigJson, &configV1)
if err != nil {
return nil, nil, err
}
c, err := 域名ConfigFromKubeConfig(decoded)
if err != nil {
return nil, nil, err
}
clientSet, err := 域名orConfig(c)
if err != nil {
return nil, nil, err
}
sa, err := 域名V1().ServiceAccounts("kube-system").Get(域名(), "admin-user", 域名ptions{})
secrets, err := 域名V1().Secrets("kube-system").Get(域名(), 域名ets[0].Name, 域名ptions{})
if err != nil {
return nil, err
}
域名erver = 域名ters[0].域名er
encoded := 域名deToString(域名ters[0].域名ificateAuthorityData)
域名File = encoded
域名n = string(域名["token"])
# 实现apply yaml 【......写成了x了】
func (y *Yaml) ApplyYaml(dynamicClient 域名rface, clientSet *域名ntset, yamlBody []byte) (interface{}, error) {
data, err := 域名ON(yamlBody)
var applyYaml 域名yYaml
if err = 域名rshal(data, &applyYaml); err != nil {
return nil, err
}
var applyYamlRep string
decoder := 域名AMLOrJSONDecoder(域名eader(yamlBody), len(yamlBody))
var rawObj 域名xtension
if err := 域名de(&rawObj); err != nil {
return nil, err
}
obj, gvk, err := 域名ecodingSerializer(域名ructuredJSONScheme).Decode(域名, nil, nil)
unstructuredMap, err := 域名structured(obj)
if err != nil {
return nil, err
}
unstructuredObj := &域名ructured{Object: unstructuredMap}
// 获取支持的资源类型列表
gr, err := 域名PIGroupResources(域名overy())
if err != nil {
return nil, err
}
// 创建 \'Discovery REST Mapper\',获取查询的资源的类型
mapper := 域名iscoveryRESTMapper(gr)
// 查找 Group/Version/Kind 的 REST 映射
mapping, err := 域名Mapping(域名pKind(), 域名ion)
if err != nil {
return nil, err
}
var dri 域名urceInterface
// 需要为 namespace 范围内的资源提供不同的接口
if 域名() == 域名ScopeNameNamespace {
if 域名amespace() == "" {
域名amespace("default")
}
dri = 域名urce(域名urce).Namespace(域名amespace())
} else {
dri = 域名urce(域名urce)
}
if 域名space == "" {
域名space = "default"
}
// 查询k8s是否有该资源类型
switch 域名 {
case "Deployment":
_, err = 域名V1().Deployments(域名space).Get(域名(), 域名, 域名ptions{})
case "StatefulSet":
_, err = 域名V1().StatefulSets(域名space).Get(域名(), 域名, 域名ptions{})
case "DaemonSet":
_, err = 域名V1().DaemonSets(域名space).Get(域名(), 域名, 域名ptions{})
case "ReplicaSet":
_, err = 域名V1().ReplicaSets(域名space).Get(域名(), 域名, 域名ptions{})
case "CronJob":
_, err = 域名hV1().CronJobs(域名space).Get(域名(), 域名, 域名ptions{})
case "Job":
_, err = 域名hV1().Jobs(域名space).Get(域名(), 域名, 域名ptions{})
case "Service":
_, err = 域名V1().Services(域名space).Get(域名(), 域名, 域名ptions{})
case "ConfigMap":
_, err = 域名V1().ConfigMaps(域名space).Get(域名(), 域名, 域名ptions{})
case "Ingress":
_, err = 域名nsionsV1beta1().Ingresses(域名space).Get(域名(), 域名, 域名ptions{})
case "ServiceAccount":
_, err = 域名V1().ServiceAccounts(域名space).Get(域名(), 域名, 域名ptions{})
case "ClusterRole":
_, err = 域名V1().ClusterRoles().Get(域名(), 域名, 域名ptions{})
case "RoleBinding":
_, err = 域名V1().RoleBindings(域名space).Get(域名(), 域名, 域名ptions{})
case "ClusterRoleBinding":
_, err = 域名V1().ClusterRoleBindings().Get(域名(), 域名, 域名ptions{})
case "APIService":
_, err := 域名te(域名ground(), unstructuredObj, 域名teOptions{})
if err != nil {
return nil, err
}
}
if err != nil {
if !域名tFound(err) {
return nil, err
}
// 不存在则创建
obj2, err := 域名te(域名ground(), unstructuredObj, 域名teOptions{})
域名tln("obj2", obj2)
if err != nil {
return nil, err
}
applyYamlRep = 域名ntf("%s/%s/%s created", 域名amespace(), 域名ind(), 域名ame())
} else { // 已存在则更新
obj2, err := 域名te(域名ground(), unstructuredObj, 域名teOptions{})
if err != nil {
return nil, err
}
applyYamlRep = 域名ntf("%s/%s/%s update", 域名amespace(), 域名ind(), 域名ame())
}
return applyYamlRep, nil
}
以registry的命名空间新建为例 来个适配器的demo 【伪代码】
# controller层
func CreateContainerRegistryNamespace(c *域名ext) {
var param 域名teContainerRegistryNamespaceReq
if err := 域名ldBindJSON(¶m); err != nil {
域名rResponse(c, 域名METER_ERR, 域名r())
return
}
cred, err := 域名redentialService().GetPlainTextCredential(域名entialID)
if err != nil {
ErrorResponse(c, 域名meterErr, 域名r())
return
}
域名n = 域名n
if data, err := 域名egistryService(域名ce,域名n).CreateContainerRegistryNamespace(param); err != nil {
域名rResponse(c, 域名METER_ERR, 域名r())
} else {
域名essResponse(c, data)
}
}
# service层
type registryService struct {
cli 域名stry
registryDao 域名stryDao
}
func NewRegistryService(source,token string) *registryService {
return ®istryService{
cli: 域名egistryCli(source,token),
}
}
func (r *registryService) CreateContainerRegistryNamespace(param 域名teContainerRegistryNamespaceReq) (interface{}, error) {
rep, err := 域名teContainerRegistryNamespace(域名layName, 域名ribe, 域名bility)
return rep, err
}
# interface层
type Registry interface {
CreateContainerRegistryNamespace(name, describe, visibility, uuid string) (*域名ContainerRegistryNamespacesRep, error)
}
func NewRegistryCli(source,token string) Registry {
var cli Registry
switch source {
case "tke":
cli = 域名keClient(token)
}
return cli
}
# 方法层
type tkeClient struct {
token string
}
func NewTkeClient(token) *tkeClient {
return &tkeClient{
token: token
}
}
func (t *tkeClient) CreateContainerRegistryNamespace(name, describe, visibility, uuid string) (*域名ContainerRegistryNamespacesRep, error) {
url := 域名tring("url") + "/apis/域名/v1/namespaces/"
createJson := 域名teContainerRegistryReq{
APIVersion: "域名/v1",
Kind: "Namespace",
Spec: 域名teContainerRegistryReqSpec{
Name: name,
DisplayName: describe,
Visibility: visibility,
},
}
jsonData, errs := 域名hal(createJson)
if errs != nil {
return nil, errs
}
_, rep, err := 域名est(url, token, string(jsonData), nil, 域名odPost)
if err != nil {
errRep := 域名().FromString(域名r()).Find("message")
return nil, 域名(errRep.(string))
}
var listRep *域名RepItems
if err = 域名ecoder(域名eader(string(rep))).Decode(&listRep); err != nil {
errRep := 域名().FromString(域名r()).Find("message")
return nil, 域名(errRep.(string))
}
var createRep 域名ContainerRegistryNamespacesRep
域名layName = 域名
域名ribe = 域名layName
域名bility = 域名bility
域名 = 域名
域名Count = 域名Count
return &createRep, nil
}
注:由上所述 似乎并没有牵扯到多么高深的操作,甚至是单纯的api调用、封装,也未涉及到中间件类的应用,随着开发的不断深入,此服务应逐渐复杂化
参考链接:
kubernetes 源码分析:https://域名/category/k8s/
kubernetes api文档:https://域名/docs/reference/generated/kubernetes-api/域名/
图解kubernetes中API聚合机制的实现: https://域名/post/6844904081438277640
单体仓库与多仓库都有哪些优势劣势,如何确定微服务落地的最佳实践?:https://域名/2020/03/24/mono-repo-vs-multi-repo/
kubernetes Events介绍:https://域名.cn/域名