1.源码框架

Patroni官方文档对Kubernetes DCS只有非常少量的描述,以下内容通过分析Patroni源码获取。

Patroni运行时主要包括以下模块:

  • RestApiServer:基于 http.server 实现的服务端程序,提供Patroni的Restful API能力
  • DCS:元数据服务的抽象接口,用于读写共享数据
  • Postgresql:postgres服务,用于和本地的postgres进行交互
  • Ha:HA服务,负责监听postgres是否存活,实现自动主备切换

image-20211221163944208

通过分析Patroni对象的run()和构造函数可以知道,Ha对象是Patroni的"大脑",每个时间片(loop_wait配置项规定的值),Ha对象的run_cycle()被调用。该接口中,Ha通过Postgresql对象管理pg实例实例进行管理,并调用dcs的接口维持状态。

本文后续以下问题为切入点阅读patroni源码:

  • 如何管理Postgres实例
    • 初始化数据目录
    • 启动postgres服务
    • 健康检查
  • Leader选举
    • 成为Leader
    • 重新选举
  • 故障转移
    • 手工触发故障转移
    • 自动故障转移
  • DCS上保存了那些数据
  • Patroni如何与K8S通信

Gist片段:Patroni实例启动流程

2.DCS 数据结构

由于Patroni能够对接多种DCS,为兼容环境不同DCS的存储格式,Patroni源码中抽象了Cluster对象。

Cluster对象是基于namedtuple(命名元组)的字典,包括如下属性:

属性 说明 Kubernetes 中的存储位置(Obj)
initialize 用于表示集群是否已经被初始化 Endpoints/<cluster-name>-config
config ClusterConfig对象,保存Patroni的动态配置 Endpoints/<cluster-name>-config
history TimelineHistory,对象记录由于主备切换产生的wal时间线 Endpoints/<cluster-name>-config
leader Leader对象,保存当前Leader节点的信息 Endpoints/<cluster-name>
last_lsn last_lsn Endpoints/<cluster-name> 对应的Key为optime
slots logical replication slots 主要用于logical replication场景,默认情况下值为None
members Member对象列表,表示集群中每一个PostgreSQL实例的状态 Pod
failover Failover对象,记录即将发生的failover操作信息 Endpoints/<cluster-name>-failover
sync SyncState对象,记录synchronous replication的状态 Endpoints/<cluster-name>-sync

在Kubernetes DCS实现中,Cluster的内容全部保存在对应Pod或者Endpoint的annotations和labels中,下图展示了名称为rccp-minimal-cluster对应endpoints:

image-20211221155241685

上面图中,rccp-minimal-cluster-config只是用来存储数据,后端没有连接对应IP地址,rccp-minimal-cluster-failover仅当集群存在要进行的Failover操作时存在!

关于Kubernetes DCS 初始化 Cluster的代码参考Gist,以下详细列出Cluster中成员的详细信息。

0. Kubernetes DCS相关配置

Patroni支持Kubernetes作为后端DCS,并且可以使用两种不同类型的Kubernetes对象来存储配置信息

  • Endpoints:Patroni将配置信息存储在对应Endpoints的annotations中。
  • ConfigMaps:Patroni创建ConfigMaps来存储配置信息,当用户主备切换时会同时更新ConfigMaps和Endpoint信息

下面所有配置都有相关ENV设定。

Yaml配置 默认值 作用
bypass_api_service false 是否通过kube-apiserver的ep直连
namespace default 当前Patroni运行的Namespaces
labels 用于List当前集群相关Kubernetes Obj的labels,包括Pod、EP等
scope_label cluster-name 记录集群名称的Label-Key
use_endpoints false 是否使用Endpoints记录集群信息
pod_ip 当前运行Patroni的PodIP,当Pod被提升为Master时用于填写LeaderIP
ports Master SVC的Port信息
cacert kube-apiserver相关CA

Yaml

Environmemt配置

1. initialize 参数

上述参数中,initialize是PG数据库初始化生成"Database system identifier",patron通过pg_controldata |grep "Database system identifier"获取这个参数,并写入到Cluster配置中。

2. Member

Member对象表示PostgreSQL Cluster中的一个成员,其实现是一个namedtuple(命名元组),包括以下属性:

属性 说明 Kubernetes DCS
index Member对象上一次修改的索引ID pod.metadata.resourceVersion
name Member的名称,即实例名 pod.metadata.name
session 会话id或者时间戳 None
data 数据字典,可能包括:conn_url, role等内容 annotations,Key为status,以及pod.metadata.labels

在上述data字典的值如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
{
	"conn_url": "postgres://99.233.166.176:5432/postgres",
	"api_url": "http://99.233.166.176:8008/patroni",
	"state": "running",
	"role": "master",
	"version": "2.1.2",
	"xlog_location": 83886080,
	"timeline": 1,
	"pod_labels": {
		"application": "spilo",
		"cluster-name": "rccp-minimal-cluster",
		"controller-revision-hash": "rccp-minimal-cluster-5747c75c6",
		"spilo-role": "master",
		"statefulset.kubernetes.io/pod-name": "rccp-minimal-cluster-0",
		"team": "rccp"
	}
}

3. ClusterConfig

属性 说明 Kubernetes DCS
index 对象上一次修改的索引ID endpoint.metadata.resourceVersion
data 状态信息 endpoint.metadata.config
modify_index 会话id或者时间戳 endpoint.metadata.resourceVersion

在上述data字典的值如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
{
    "loop_wait": 10,
    "maximum_lag_on_failover": 33554432,
    "postgresql": {
        "parameters": {
            "archive_mode": "on",
            "archive_timeout": "1800s",
            "autovacuum_analyze_scale_factor": 0.02,
            "autovacuum_max_workers": 5,
            "autovacuum_vacuum_scale_factor": 0.05,
            "checkpoint_completion_target": 0.9,
            "hot_standby": "on",
            "log_autovacuum_min_duration": 0,
            "log_checkpoints": "on",
            "log_connections": "on",
            "log_disconnections": "on",
            "log_line_prefix":"%t [%p]: [%l-1] %c %x %d %u %a %h ",
            "log_lock_waits": "on",
            "log_min_duration_statement": 500,
            "log_statement": "ddl",
            "log_temp_files": 0,
            "max_connections": 100,
            "max_replication_slots": 10,
            "max_wal_senders": 10,
            "tcp_keepalives_idle": 900,
            "tcp_keepalives_interval": 100,
            "track_functions": "all",
            "wal_compression": "on",
            "wal_level": "hot_standby",
            "wal_log_hints": "on"
        },
        "use_pg_rewind": true,
        "use_slots": true
    },
    "retry_timeout": 10,
    "ttl": 30
}

4. TimelineHistory

属性 说明 Kubernetes DCS
index 对象上一次修改的索引ID endpoint.metadata.resourceVersion
value 状态信息 endpoint.metadata.history
lines 内容同上,通过json.load 转换为list 同上

在上述lines字典的值如下:

1
2
3
4
5
6
7
[
	1,
	83886232,
	"no recovery target specified",
	"2021-12-21T13:48:54.315582+08:00",
	"rccp-minimal-cluster-1"
]

5. Leader

属性 说明 Kubernetes DCS
index 对象上一次修改的索引ID endpoint.metadata.resourceVersion
session 会话id或者时间戳 None
member Leader对应的Member对象引用 根据Leader名称从Pod上获取

6. Failover

属性 说明 Kubernetes DCS
index 对象上一次修改的索引ID endpoint.metadata.resourceVersion
leader 当前Leader实例名称 endpoint.metadata.leader
candidate 候选实例名称 endpoint.metadata.member
scheduled_at 计划调度时间 endpoint.metadata.scheduled_at

7. SyncState

这个ep主要是用于保存synchronous_mode模式时,leader和需要sync写入的standby:

属性 说明 Kubernetes DCS
index 对象上一次修改的索引ID endpoint.metadata.resourceVersion
leader 当前Leader实例名称 endpoint.metadata.leader
sync_standby 候选实例名称 endpoint.metadata.sync_standby

3. Kubernetes Client

Patroni在源码中并没有引用Kubernetes的Client,而是基于Kubernetes的RestAPI接口实现了一个非常精简的Client,整个实现包括以下几个Class:

  • patroni.dcs.kubernetes.K8sClient.ApiClient:参考
  • patroni.dcs.kubernetes.K8sClient.CoreV1Api:参考
  • patroni.dcs.kubernetes.ObjectCache
  • patroni.dcs.kubernetes.CoreV1ApiProxy:参考

上述Class中,CoreV1ApiProxy封装了ApiClient和CoreV1Api的实例,他们各自提供了以下能力:

  • ApiClient:提供发送RestAPI,以及HTTP连接池
  • CoreV1Api:重写__getattr__方法,通过闭包的方式提供Kubernetes资源的接口

以下Kubernetes对应DCS的实现为例,它在构造函数中创建Pod和Endpoint的WatchCache:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def __init__(self, config):
    self._labels = config['labels']
    self._labels[config.get('scope_label', 'cluster-name')] = config['scope']
    self._label_selector = ','.join('{0}={1}'.format(k, v) for k, v in self._labels.items())
    self._namespace = config.get('namespace') or 'default'

    # 初始CoreV1ApiProxy作为Kubernets客户端
    self._api = CoreV1ApiProxy(config.get('use_endpoints'), False)
    # CoreV1Api是通过闭包方式实现的API客户端,Patroni重写了__getattr__方法
    # 当用户请求coreV1Api.(action)_namespaced_(kind)时返回一个RestAPI接口,该接口可以用来获取Kubernetes上指定类型的对象。
    # 示例:
    #   pods_func = functools.partial(self._api.list_namespaced_pod, "default",label_selector={"application":"postgresql"})
    #   上述调用中,pods_func是一个函数它返回default命名空间下,所有带application=postgresql的Pod信息

    # ObjectCache是代码中自定义实现Cache缓存:
    # ObjectCache派生自Thread对象,其run方法中运行了一个死循环。
    # 每个循环中调用了一次list方法,配合watch、resourceVersion参数,list调用被阻塞直到OBJ被修改时返回。
    # ObjectCache根据list返回的内容同步内存中的数据
    
    # pods_func list指定命令空间中包含指定labels的Pod
    pods_func = functools.partial(self._api.list_namespaced_pod, self._namespace,label_selector=self._label_selector)
    # Pod的Cache
    self._pods = ObjectCache(self, pods_func, self._retry, self._condition)

    # pods_func list指定命令空间中包含指定labels的Endpoint或者ConfigMap,PS:取决于use_endpoints配置
    kinds_func = functools.partial(self._api.list_namespaced_kind, self._namespace,label_selector=self._label_selector)
    # Endpoint的Cache
    self._kinds = ObjectCache(self, kinds_func, self._retry, self._condition, self._name)

4. HA 管理

1. Loop循环

Ha对象负责协调Patroni的核心功能,当Patroni运行时周期调用run_cycle(),执行初始化、故障恢复、leader竞争的所有操作。

以下是循环内的简化逻辑,完整流程图参考Drawio文件

2.初始化数据库

在Patroni在启动数据库前需要进行PG数据目录的初始化,用户可以使用配置文件自定义初始化流程。

Patroni根据DCS上的initialize key状态判断初始化的方式:

  • bootstrap:initialize key 为None,通过initdb初始化数据库目录
  • clone:initialize key存在,通过pg_basebackup复制Leader数据库目录

1. Bootstrap

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
bootstrap:
  method: <custom_bootstrap_method_name> # 使用自定义脚本进行初始化,如果省略该配置那么默认使用initdb初始化
  initdb:                                # initdb配置
  - auth-host: md5                       # PS:以下列表均为initdb参数
  - auth-local: trust
  - encoding: UTF8
  - locale: en_US.UTF-8
  - data-checksums
  <custom_bootstrap_method_name>:        # 自定义bootstrap脚本配置
    command: <path_to_custom_bootstrap_script> [param1 [, ...]]  # 初始化脚本
    keep_existing_recovery_conf: False
    no_params: False
    recovery_conf:
      recovery_target_action: promote
      recovery_target_timeline: latest
      restore_command: <method_specific_restore_command>

上述no_params设置为True时,调用脚本时会额外通过–name=value的方式传递以下参数:

–scope Name of the cluster to be bootstrapped
–datadir Path to the data directory of the cluster instance to be bootstrapped

PS:通常情况下,只有用户创建standby_cluster时需要自定义bootstrap流程

2. Clone

当PG集群需要加入新副本时,Patroni通过Clone操作从Leader复制一份数据库目录。默认情况下,Patroni使用pg_basebackup工具执行该过程。

pg_basebackup运行时需要上游postgresql处于运行状态,并且备份数据是没有经过压缩,备份时间比较长。Patroni支持自定义Clone流程,用户可以在postgresql块中指定:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
postgresql:
  create_replica_methods:
  - wal_e
  - pgbackrest
  - basebackup
  wal_e:
    command: patroni_wale_restore
    no_master: 1
    envdir: {{WALE_ENV_DIR}}
    use_iam: 1
  basebackup:
    max-rate: '100M'
  pgbackrest:
    command: /usr/bin/pgbackrest --stanza=<scope> --delta restore
    keep_data: True
    no_params: True  

上述配置,定义了wal_e、pgbackrest、basebackup三种方式的clone方式,patroni会顺序调用以上接口直到某一种方式成功。

用户在配置中定义的参数都会通过–name=value的方式在脚本时传递,但是以下几个配置有特殊含义:

  • no_master:允许 Patroni 调用副本创建方法,即使没有正在运行的主节点或副本。
  • keep_data:调用clone方法前不会清理PGDATA中的数据
  • no_params:限制patroni传递以下补充参数
–scope Which cluster this replica belongs to
–datadir Path to the data directory of the replica
–role Always ‘replica’
–connstring Connection string to connect to the cluster member to clone from (master or other replica).
The user in the connection string can execute SQL and replication protocol commands.

参考Patroni实现源码:Gist

1. basebackup

默认情况不指定create_replica_method时,patroni基于pg_basebackup创建replicas,可以通过以下方式定义pg_basebackup的参数:

1
2
3
postgresql:
  basebackup:
    max-rate: '100M'

patroni使用**–wal-method=stream**,方式调用pg_basebackup,并且禁止用户开启压缩配置。

参考Patroni实现源码:Gist

2. basebackup_fast_xlog

basebackup_fast_xlog 是spilo镜像官方定义clone脚本,相关配置如下:

1
2
3
4
5
6
postgresql:
  create_replica_method:
  - basebackup_fast_xlog
  basebackup_fast_xlog:
    command: /scripts/basebackup.sh
    retries: 2

3.Synchronous Standby

当Cluster工作在Sync模式时,Patroni在每个工作循环中都会检查/更新sync-standby的节点状态。

Patroni根据以下优先级 Sync Standby

  • 已经处于sync状态的
  • 非sync状态,选择flush_lsn值最大的

限制:

  • 如果配置maximum_lag_on_syncnode值不是-1,那么当前standby中lsn和max_lsn差值大于maximum_lag_on_syncnode的节点,不能被选为候选。
  • 如果配置了synchronous_mode_strict=true,而无法获取任何候选那么pg的synchronous_standby_names配置被设置为'*'

Patroni 获取候选的SQL:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
SELECT
	pg_catalog.lower(application_name),
	sync_state,
	pg_wal_lsn_diff(flush_lsn, '0/0')::bigint
FROM pg_catalog.pg_stat_replication
WHERE
	state = 'streaming' AND
	flush_lsn IS NOT NULL
ORDER BY
	sync_state DESC,
	flush_lsn DESC;

返回信息

1
2
3
4
      lower      | sync_state | pg_wal_lsn_diff
 ----------------+------------+-----------------
  patroni-rccp-1 | sync       |        83886400
  patroni-rccp-2 | async      |        83886400

Patroni相关源码,参考:Gist

4. Ha 关键接口

  • 处理PG starting 状态超时以及主备切换handle_starting_instance()gist
  • PG初始化入口函数bootstrap()
  • PG启动后初始化用户信息post_bootstrap()gist
  • PG故障恢复接口recover()

5. 数据库管理

Patroni 源码中通过Postgresql对象操作数据,在运行过程中可以涉及到以下操作:

  • 启动/停止/重启数据库
  • 获取数据库当前状态

1. 数据库的状态

Postgresql对象的cluster_info_query()函数封装了一条SQL语句,从系统表中实时插叙数据库状态,查询语句主要包括以下系统函数:

  • pg_catalog.pg_is_in_recovery():判断当前pg是否运行在recovery模式

  • pg_catalog.pg_current_wal_lsn():当前wal位置,只有在非recovery模式时返回

  • pg_catalog.pg_last_wal_replay_lsn():当前replay的wal位置,只有在recovery模式时返回

  • pg_catalog.pg_last_wal_receive_lsn():当前接受的wal位置,只有在streaming replicas模式时返回

  • pg_catalog.pg_is_wal_replay_paused():wal replay是否暂停

  • pg_catalog.pg_stat_get_wal_receiver():返回streaming replicas的状态信息

查询语句如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
select
	case
		when pg_catalog.pg_is_in_recovery() then 
			0
		else 
			('x' || pg_catalog.substr(pg_catalog.pg_walfile_name(pg_catalog.pg_current_wal_lsn()),1,8))::bit(32)::int
	end as timeline,
	case
		when pg_catalog.pg_is_in_recovery() then 
			0
		else 
			pg_catalog.pg_wal_lsn_diff(pg_catalog.pg_current_wal_lsn(),'0/0')::bigint
	end as wal_position,
	pg_catalog.pg_wal_lsn_diff(pg_catalog.pg_last_wal_replay_lsn(),'0/0')::bigint as replayed_location,
	pg_catalog.pg_wal_lsn_diff(coalesce(pg_catalog.pg_last_wal_receive_lsn(), '0/0'),'0/0')::bigint as replayed_location,
	pg_catalog.pg_is_in_recovery() and pg_catalog.pg_is_wal_replay_paused() as replay_paused,
	0 as pg_control_timeline,
	case
		when latest_end_lsn is null then null
		else received_tli
	end as received_tli,
	slot_name,
	conninfo,
	null as slots
from
	pg_catalog.pg_stat_get_wal_receiver();

查询结果:

属性 备注
timeline Master上运行的时间线,Standby节点为0,这个属性被用来判断是否为leader
wal_position Master节点上wal的位置,Standby节点为0
replayed_location Standby节点replay的wal位置,Master为Null
received_location Standby节点receive的wal位置,Master为0
replay_paused Standby节点是否停止replay
pg_control_timeline 默认均为0
received_tli Standby上Streaming Replicas 接收的timeline
slot_name Standby使用的slot_name
conninfo Master的连接信息
slots 默认为Null

2. 启动数据库

Postgresql定义了start接口用于启动postgres,并且在启动之后会通过5432端口,并使用pg_isready工具判断是否启动完成。启动PG前Patroni会根据当前实例role,重新生成所有的配置。

启动数据库的逻辑比较简单,参考:启动PG

当新实例启动或者集群发生主备切换时,Patroni不会直接启动postgres,而是调用Postgresql的follow()(Ha对象的接口)将作为Standby加入集群。follow()中Patroni重新生成了recovery模式的配置信息,并且根据实际情况调用start()或者restart()

在Ha的**demote()**接口中就调用了follow(),该接口主要用来将运行中的Master降级为replica。**demote()**支持不同模式的降级方式,但是大体步骤都是

  • 停止运行中的pg
  • 更新DCS状态
  • 获取当前Leader信息
  • 判断是否需要pg_rewind
  • 同步或者异步调用follow()函数

demote()接口的代码片段,参考:降级操作

3. 关闭数据库

Postgresql定义了stop接口用来关闭数据库,并且改接口支持定义回调函数(on_safepoint和on_shutdown)参数,对于不是运行在Recovery模式的节点,通过参数可以选择是否退出前执行CHECKPOINT命令。

在posix类型的系统上,patroni通过向pg的进程发送信号量来关闭pg。根据mode的参数不同,发送的信号量有所区别。

1
2
3
4
5
STOP_SIGNALS = {
    'smart': 'TERM',
    'fast': 'INT',
    'immediate': 'QUIT',
}

stop函数的代码片段参考:停止PG

4. 判断PG是否健康

Postgresql提供**is_healthy()**接口判断数据库是否健康,改接口被Ha对象在每个Loop中调用,如果返回false则触发recover()机制。

目前Patroni仅通过Postgres进程是否正常运行,判断数据库是否健康!

5.PG_REWIND

Master没有运行在Recover状态并且本地Postgres不健康,触发 PG_REWIND/ReInit 的条件:

  • local_timeline > master_timeline
  • local_timeline < master_timeline && master_timeline != 1

Recover接口相关代码参考:gist

6. DCS 关键接口

  • 更新锁update_leader()gist
  • 获取锁take_leader()
  • 释放锁delete_leader()
  • 成员注册,更新当前PG信息touch_member()
  • 更新Cluster信息,判断leader是否存在**_load_cluster()**:该接口会根据endpoint对象上一次的更新时间是否超出ttl的设置来判断是否将leader设置为None,Ha对象每个Loop都会调用该接口,以此判断是否会触发当前节点尝试获取Leader。参考:gist

7.其他内容

1.LSN

LSN(Log sequence number),表示wal日志序列号,是WAL日志的唯一、全局标识。

WAL日志文件的命名规则如下,例如:000000010000000100000092

  • 前8位:00000001表示timeline;
  • 中8位:00000001表示logid;
  • 后8位:00000092表示logseg

假设LSN为1/920001F8(高32位/低32位),LSN和WAL日志文件以及对应偏移量的对应关系如下:

  • 确定timeline:WAL文件名前8位
  • 确定WAL文件:WAL文件名后8位 == LSN低32位/2*24(即低32位的最高两位)
  • 确定便宜量:LSN低32位中后24位对应的十进制值
1
2
3
4
5
6
-- 当前lsn
select pg_current_wal_lsn();
-- 当前wal文件
select pg_walfile_name(pg_current_wal_lsn());
-- 当前wal文件,以及便宜量
select pg_walfile_name_offset(pg_current_wal_lsn());

每次Master发生主备切换都会产生一个时间线文件,保存在 pg_wal 目录下,名称格式为**<时间线>.histroy**

1
2
3
1	0/6F4BF720	no recovery target specified
2	1/B0000098	no recovery target specified
3	1/B1000098	no recovery target specified

上述内容保存了该时间线包含WAL范围。

每个Histroy File的内容实际上是:上一个时间线文件的内容+生成该时间线是上一个时间中LSN