apiserver

date
Oct 16, 2022
slug
notionic-example
status
Published
tags
Website
Notion
Design
summary
type
Post

How to Dev

notion imagenotion image
提到了需要支持了比如鉴权/授权(authn/authz)、准入控制(admission)、请求代理(proxy)、持久化(storage)、审计(aduit)、流量控制(flow control)、在 Github Repo: k8s.io/apiserver 库中都给留出了Hook点或者现成的函数,我们可以通过实现接口来插入自定义的能力。
为了实现更规范化的APIServer服务,建议多参考下k8s-apiserver的项目结构和代码实现

资源对象(gvk/gvr)

根据后续我们依赖的能力,除了工作负载和配置资源,我们还要暴露fs、pcl、sc、role、rolebind等资源

rest.Storage

Github Dir: kubernetes/pkg/registry 包括所有内置资源的 storage 实现
apps的代码是需要参考的,这样我们自定义的gvk,都能参考这个模型来实现
Github File: k8s.io/apiserver/pkg/registry/rest/rest.go 自定义资源要想通过REST风格对外暴露服务,需要满足下面几个条件:
必须要实现 Github Interface: Storage 接口
按需实现 Github Interface: StandardStorage 接口中的对象,例:GET->Getter POST/PUT-> CreaterUpdater(细节请看 Github Method: registerResourceHandlers),不过一般通过内嵌 genericregiser.Store 对象实现
// StandardStorage is an interface covering the common verbs. Provided for testing whether a // resource satisfies the normal storage methods. Use Storage when passing opaque storage objects. type StandardStorage interface { Getter Lister CreaterUpdater GracefulDeleter CollectionDeleter Watcher }
自定义CRD需要实现 Github Interface: RESTStorageProvider 接口,这样下面的 restStorageProviders 就可以直接注册使用
Github File: kubernetes/pkg/controlplane/instance.go 中的 restStorageProviders 列表会统一注册非core的资源的hanlder

scheme

Github File: pkg/apis/core/install/install.go 记录了如何安装 core api 到注册表,对于自定义资源,我们要额外提供内部类型的实现,转换函数可以自动生成,同时需要把转换函数 Github CodeLine: localSchemeBuilder.Register(RegisterConversions) 注册到Scheme
func init() { Install(legacyscheme.Scheme) } // Install registers the API group and adds types to a scheme func Install(scheme *runtime.Scheme) { // 内部类型的注册 utilruntime.Must(core.AddToScheme(scheme)) // 外部类型的注册 utilruntime.Must(v1.AddToScheme(scheme)) utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion)) }
Github File: kubernetes/pkg/controlplane/import_known_versions.go 会统一install所有的资源到Scheme 我们需要参考这个实现

认证/授权(authn/authz)

Github Func: DefaultBuildHandlerChain 函数实现了 HTTP Middleware 的能力,用户请求需要经过层层Handler的执行,鉴权、授权的操作就放在这里实现,不仅仅是鉴权授权,下面的审计、准入控制、流量管理都是通过HTTP Middleware嵌套Handler来完成。

认证

k8s有多种认证器 BasicAuth、ClientCA、TokenAuth、ServiceAccountAuth等,Github Interface: authenticator.Request 定义了认证器的接口,在初始化 RecommendedOptions.Authentication 的时候会把这些认证器注册进去,我们主要关心 x509 的认证器 Github Type: x509.Authenticator。如果新增或者修改认证器也在这里操作

授权

k8s有多种授权器 AlwaysAllow、ABAC、Webhook、Node、RBAC等,Github Interface: authorizer.AuthorizerGithub Interface: authorizer.RuleResolver 两个接口定义了授权器,其中:
Github Interface: authorizer.Authorizer 用于从请求中获取授权信息,如果这一步没成功那么决策为失败
// Authorizer makes an authorization decision based on information gained by making // zero or more calls to methods of the Attributes interface. It returns nil when an action is // authorized, otherwise it returns an error. type Authorizer interface { Authorize(ctx context.Context, a Attributes) (authorized Decision, reason string, err error) }
Github Interface: authorizer.RuleResolver 用于解析规则,看是否可以对资源进行操作
// RuleResolver provides a mechanism for resolving the list of rules that apply to a given user within a namespace. type RuleResolver interface { // RulesFor get the list of cluster wide rules, the list of rules in the specific namespace, incomplete status and errors. RulesFor(user user.Info, namespace string) ([]ResourceRuleInfo, []NonResourceRuleInfo, bool, error) }
在初始化 RecommendedOptions.Authorization 的时候注册进入,我们主要关心 Github Type: rbac.RBACAuthorizer 的实现,不需要太多代码就能实现RBAC的能力

准入控制(admission)

kube-apiserver 中所有已启用的准入控制器由 Github Variable: chainAdmissionHandler 数据结构管理,当客户端发送请求给 kube-apiserver 时,Handler 会遍历 Github Variable: chainAdmissionHandler 中启用的准入控制器并执行变更和验证操作,我们想要自定义准入控制就需要把自己的准入控制注册进来
Github Dir: kubernetes/plugin/pkg/admission 目录下为k8s内置的准入控制器, kube-apiserver 在启动时候会调用 Github CodeLine: kubeoptions.NewAdmissionOptions() 把内置的准入控制器列表传到 ServerRunOptions上。
K8S的CEL功能

请求代理(proxy)

在处理pod的查询 或者是pod exec log的时候,需要做代理
参考 Github Func: newThrottledUpgradeAwareProxyHandler 这个函数里使用了 Github Func: NewUpgradeAwareHandler ,我们后面进行代理的时候最好使用这个函数,从名字可以看出这个是一个升级感知的Handler,因为K8S中有很多需要做协议升级(尤其是exec),这个Handler帮我们实现了很多协议升级的部分。
exec子资源实现了 Github Interface: Connecter 接口,对exec资源做操作的请求会被代理到这个接口的 Connect 方法,方法会返回这个Hanlder去代理请求

持久化(storage)

Github Type: Store 用于etcd的存储,一般k8s资源都嵌入了这个结构体,他实现了下面列出的这些接口,这样k8s资源把 Store 作为内嵌资源,就自然可以实现下面这些接口。
// Note: the rest.StandardStorage interface aggregates the common REST verbs var _ rest.StandardStorage = &Store{} var _ rest.Exporter = &Store{} var _ rest.TableConvertor = &Store{} var _ GenericStore = &Store{}
但是我们大概率不使用,因为他会使用之前说的Codec可以参考 Github Repo: mink 来实现一个对接MySQL的结构,建议参考上面的接口实现
持久化部分注意事务处理

审计(aduit)

框架已经自带了审计能力 Github Func: WithAudit ,如果需要额外的修改可以:
参考 Github Func: DefaultBuildHandlerChain 实现修改一下 审计日志 Handler 的实现,然后把新的 HanlderChain 赋值给 Github member: Config.BuildHandlerChainFunc
参考 Github Func: WithAudit 的实现修改

流量控制(flow control)

配合RBAC,完成流量控制,Github CodeLine: FlowControlGithub Func: DefaultBuildHandlerChain 的Handler 链实现,原理可以参考https://blog.csdn.net/sinat_37367944/article/details/116329588 ,因为kube-apiserver在启动后的PostHook里会有专门处理FlowSchema和PriorityLevelConfiguration动态控制流量权重,这部分需要做一些兼容改造

可观测(tracing)

metrics、log、trace(可以先不加)
注意一定要加trace-id,各个span耗时
Github Package: k8s.io/utils/trace 包有trace功能,不过功能少,可以考虑用下面的2个包(还没调研):

质量(code quality)

代码规范,Commit-Message 规范自动化
流量灰度
集成测试
E2E测试
vet、lint 、 空指针 Github Repo: nilaway
 
 
watcher Interface
Connector对象只能用于子资源
参考kube-gateway,cluster-gateway,实现请求的转发
func buildProxyHandlerChainFunc(clusterManager clusters.Manager, enableAccessLog bool) func(apiHandler http.Handler, c *genericapiserver.Config) http.Handler { return func(apiHandler http.Handler, c *genericapiserver.Config) http.Handler { // new gateway handler chain handler := gatewayfilters.WithDispatcher(apiHandler, proxydispatcher.NewDispatcher(clusterManager, enableAccessLog)) // without impersonation log handler = gatewayfilters.WithNoLoggingImpersonation(handler, c.Authorization.Authorizer, c.Serializer) // new gateway handler chain, add impersonator userInfo handler = gatewayfilters.WithImpersonator(handler) handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc) failedHandler := genericapifilters.Unauthorized(c.Serializer, c.Authentication.SupportsBasicAuth) failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker) handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences) handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true") // disabel timeout, let upstream cluster handle it // handler = gatewayfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout) handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup) // new gateway handler chain handler = gatewayfilters.WithPreProcessingMetrics(handler) handler = gatewayfilters.WithExtraRequestInfo(handler, &request.ExtraRequestInfoFactory{}) handler = gatewayfilters.WithTerminationMetrics(handler) handler = gatewayfilters.WithRequestInfo(handler, c.RequestInfoResolver) if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 { handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance) } handler = genericapifilters.WithCacheControl(handler) handler = gatewayfilters.WithNoLoggingPanicRecovery(handler) return handler } }

More

If you have any questions, please contact me.