KV 和 RPC 接口
本页描述 Fluxon 的 Python KV API 和节点间 RPC 调用。两者由同一个 KvClient 实例提供,共享生命周期。
cluster_name、instance_key、etcd 等前置概念见 架构和概念。
Python 业务代码优先直接在代码里写 Python dict,并传给 FluxonKvClientConfig(...);YAML 更适合独立进程启动、supervisor、部署和示例环境。
服务平面
在写 put_blocking/get_blocking/rpc_call 这些 Python 业务代码之前,需要先把 KV 依赖的服务平面拉起来。共性的角色关系、启动顺序和 runtime 边界,统一见 用户 - 2 - 服务平面。

直接接触的服务平面对象主要有:
greptime:用于标准监控链路。安装与启动见 用户 - 2 - 服务平面etcd:KV 控制面元数据存储。安装与启动见 用户 - 2 - 服务平面start_kv_master_process(...):启动fluxonkv masterstart_owner_kvclient_process(...):启动owner
最小可运行示例脚本如下。这个脚本只启动 Fluxon 自己的角色;etcd / greptime 仍按服务平面文档单独启动,并且默认假设:
etcd在127.0.0.1:2379greptimeHTTP 在127.0.0.1:34030- 当前
python3所在环境已经安装fluxon-*.whl和fluxon_pyo3-*.whl;安装方式见 用户 - 0 - 安装
对应示例脚本:examples/start_master_owner.py
这个脚本支持两种启动方式:
- 默认方式:启动
master + owner --without-master:只启动owner,接入已经存在的 KV 集群master
#!/usr/bin/env python3
import argparse
from pathlib import Path
from fluxon_py.runtime import (
start_kv_master_process,
start_owner_kvclient_process,
wait_subproc_or_ctrlc,
)
from fluxon_py.runtime.process_runner import ManagedSubprocess
ETCD_ENDPOINT = "127.0.0.1:2379"
GREPTIME_HTTP_PORT = 34030
GREPTIME_BASE_URL = f"http://127.0.0.1:{GREPTIME_HTTP_PORT}"
CLUSTER_NAME = "demo-kv-cluster"
SHARED_MEMORY_PATH = Path("/dev/shm/fluxon_kv_demo").resolve()
SHARED_FILE_PATH = Path("/tmp/fluxon_kv_demo/shared").resolve()
WORKDIR = Path("/tmp/fluxon_kv_demo/runtime").resolve()
MASTER_PORT = 31000
MASTER_UI_PORT = 18080
MASTER_INSTANCE_KEY = "demo_kv_master"
OWNER_INSTANCE_KEY = "demo_kv_owner"
OWNER_DRAM_BYTES = 1073741824
def main() -> None:
args = parse_args()
SHARED_FILE_PATH.mkdir(parents=True, exist_ok=True)
log_dir = (WORKDIR / "log").resolve()
if args.with_master:
master_log_dir = (WORKDIR / "master_logs").resolve()
master_log_dir.mkdir(parents=True, exist_ok=True)
master_stdout_log = log_dir / "master.log"
master_proc = start_kv_master_process(
config=build_master_config(log_dir=master_log_dir),
log_path=master_stdout_log,
)
else:
master_stdout_log = None
master_proc = None
owner_stdout_log = log_dir / "owner.log"
owner_proc = start_owner_kvclient_process(
config=build_owner_config(),
log_path=owner_stdout_log,
)
children = []
if master_proc is not None:
children.append(
ManagedSubprocess(
label="master",
proc=master_proc,
)
)
children.append(
ManagedSubprocess(
label="owner",
proc=owner_proc,
)
)
print(f"[fluxon_kv] shared memory path: {SHARED_MEMORY_PATH}")
print(f"[fluxon_kv] shared file path: {SHARED_FILE_PATH}")
print(f"[fluxon_kv] etcd endpoint: {ETCD_ENDPOINT}")
print(f"[fluxon_kv] greptime base url: {GREPTIME_BASE_URL}")
print(f"[fluxon_kv] start master in this script: {args.with_master}")
if master_stdout_log is not None:
print(f"[fluxon_kv] master stdout log: {master_stdout_log}")
print(
"[fluxon_kv] kv web ui: "
f"http://<host-ip-or-domain>:{MASTER_UI_PORT}/view?cluster_name={CLUSTER_NAME}&member_kind=kv"
)
else:
print("[fluxon_kv] master stdout log: disabled by --without-master")
print(f"[fluxon_kv] owner stdout log: {owner_stdout_log}")
stack_label = "master and owner" if args.with_master else "owner"
print(f"[fluxon_kv] waiting for Ctrl-C to stop {stack_label}")
wait_subproc_or_ctrlc(
children,
on_ctrlc=lambda: print(f"[fluxon_kv] caught Ctrl-C, stopping {stack_label}"),
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Start KV demo owner, optionally with a local master")
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--with-master",
dest="with_master",
action="store_true",
help="Start a local kv master in this script (default)",
)
group.add_argument(
"--without-master",
dest="with_master",
action="store_false",
help="Do not start a local kv master; only start owner and attach to an existing cluster master",
)
parser.set_defaults(with_master=True)
return parser.parse_args()
def build_master_config(*, log_dir: Path) -> dict:
return {
"instance_key": MASTER_INSTANCE_KEY,
"cluster_name": CLUSTER_NAME,
"port": MASTER_PORT,
"etcd_endpoints": [ETCD_ENDPOINT],
"log_dir": str(log_dir),
"monitoring": {
"prometheus_base_url": f"{GREPTIME_BASE_URL}/v1/prometheus",
"prom_remote_write_url": [f"{GREPTIME_BASE_URL}/v1/prometheus/write"],
"otlp_log_api": {
"otlp_endpoint": f"{GREPTIME_BASE_URL}/v1/otlp/v1/logs",
},
},
"master_ui": {
"http_listen_addr": f"0.0.0.0:{MASTER_UI_PORT}",
},
}
def build_owner_config() -> dict:
return {
"instance_key": OWNER_INSTANCE_KEY,
"contribute_to_cluster_pool_size": {
"dram": OWNER_DRAM_BYTES,
"vram": {},
},
"fluxonkv_spec": {
"etcd_addresses": [ETCD_ENDPOINT],
"cluster_name": CLUSTER_NAME,
"shared_memory_path": str(SHARED_MEMORY_PATH),
"shared_file_path": str(SHARED_FILE_PATH),
"sub_cluster": "default",
},
}
if __name__ == "__main__":
main()启动命令:
python3 examples/start_master_owner.py
python3 examples/start_master_owner.py --without-master默认命令会启动本机 master + owner。--without-master 只启动本机 owner,要求同一个 cluster_name 对应的 master 已经在别处运行。
上面的 build_master_config(...) 里,master_ui 是可缺省配置块。配置后,start_kv_master_process(...) 会让 KV Web UI 直接作为 master 内的 HTTP 服务一起启动:
master_ui:
http_listen_addr: 0.0.0.0:18080这个 UI 的实际宿主是 fluxon_cli 的 KV monitor web。URL 形状固定为:
http://<host-ip-or-domain>:18080/view?cluster_name=demo-kv-cluster&member_kind=kvowner 把共享内存池和 shared.json 准备好之后,再运行下面的业务最小示例。
生命周期与调用流程(Call Flow)
User-visible processes:
- etcd (control-plane metadata store)
- Fluxon cluster node process(es) (the remote peers you address via instance_key/node_id)
- Your Python process (business code using fluxon_py)
FluxonKvClientConfig (prefer Python dict; YAML also supported)
|
v
new_store(cfg) -> KvClient (one instance in your Python process)
| |
| +-- KV: put_blocking/get_blocking/remove/... -> Result[...]
| | get_blocking() -> Result[MemHolder, ApiError]
| | -> access() -> Result[FlatDict, ApiError]
| |
| +-- Node call(server): rpc_register(path, handler) -> Result[OkNone, ApiError]
| | (handler lives in your Python process; keep it running)
| |
| +-- Node call(client): rpc_call(node_id, path, payload, timeout_ms) -> Result[响应句柄, ApiError]
| -> wait() -> Result[FlatDict, ApiError]
|
v
api.close() -> Result[OkNone, ApiError]核心对象(Python)
FluxonKvClientConfig:配置对象,优先直接从 Python dict 创建,也支持从 YAML 文件加载。new_store(config: FluxonKvClientConfig) -> Result[KvClient, ApiError]:创建 KV client 实例。KvClient:统一入口,同时提供 KV 读写与节点间调用。MemHolder:get_blocking(...)成功后的读取结果持有者,access()取得FlatDict。PutOptionalArgs:put_blocking(...)的可选参数对象,当前常用字段是lease_id。test_spec_config.disable_observability:最小 external client 示例里显式设为True,避免把 OTLP / observe 后台任务引入“只验证 KV/RPC 基本链路”的示例生命周期。
注意:
MemHolder没有bytes();需要access()后从 dict 里取 bytes 字段(常用字段名payload)。store.close()会等待当前 client 暴露出去的MemHolder全部释放;示例里在close()前显式删掉mem/flat,就是为了满足这个关闭约束。Result必须显式消费:调用unwrap()或unwrap_error()。
FlatDict(数据模型)
KV value 和节点间调用的 payload 统一为 flat dict:
FlatDict = Dict[str, Union[int, float, bool, str, bytes, dlpack]]
KV 接口最小示例
对应示例脚本:examples/external_put_get_del.py
#!/usr/bin/env python3
from fluxon_py import FluxonKvClientConfig, new_store
INSTANCE_KEY = "demo_kv_external"
CLUSTER_NAME = "demo-kv-cluster"
SHARED_MEMORY_PATH = "/dev/shm/fluxon_kv_demo"
SHARED_FILE_PATH = "/tmp/fluxon_kv_demo/shared"
def main() -> None:
cfg = FluxonKvClientConfig(
{
"instance_key": INSTANCE_KEY,
"fluxonkv_spec": {
"cluster_name": CLUSTER_NAME,
"shared_memory_path": SHARED_MEMORY_PATH,
"shared_file_path": SHARED_FILE_PATH,
},
"test_spec_config": {
"disable_observability": True,
},
}
)
store = new_store(cfg).unwrap("new_store failed")
key = "hello"
value = b"world"
try:
store.put_blocking(key, {"payload": value}).unwrap("put_blocking failed")
print(f"OK put key={key}")
mem = store.get_blocking(key).unwrap("get_blocking failed")
flat = mem.access().unwrap("mem.access failed")
payload = flat["payload"]
if not isinstance(payload, (bytes, bytearray)):
raise RuntimeError(f"payload is not bytes: {type(payload)}")
print(bytes(payload).decode("utf-8"))
store.remove(key).unwrap("remove failed")
print(f"OK del key={key}")
exists = store.is_exist(key).unwrap("is_exist failed")
if exists:
raise RuntimeError(f"expected is_exist({key!r}) to be False after remove")
print("OK is_exist after remove -> False")
finally:
# Release MemHolder-related references before close(); client shutdown waits
# until all user-visible holders are dropped.
if "flat" in locals():
del flat
if "mem" in locals():
del mem
store.close().unwrap("close failed")
if __name__ == "__main__":
main()常用接口(KV)
上面的 KV 最小示例如果要打开更详细的用户进程日志,直接在启动 Python 进程前设置:
FLUXON_LOG=DEBUG python3 examples/external_put_get_del.py日志相关对象如下:
FLUXON_LOG:控制当前 Python 业务进程 console logger 的输出门限- Fluxon Python 侧 logger 会读取
FLUXON_LOG;合法值是DEBUG、INFO、WARNING、ERROR、CRITICAL,默认INFO log_dir:master本地日志 authorityshared_file_path:本机共享文件 authority,shared.json、日志、profile 等文件位于这里
如果服务平面的 master.monitoring.otlp_log_api 已经配置,后台服务日志还会继续采集到 Greptime 的 fluxon_logs 表。
put_blocking(key: str, value: FlatDict, opts: Optional[PutOptionalArgs] = None) -> Result[OkNone, ApiError]
- 作用:写入或覆盖一个 KV。
key:要写入的 KV key。value:要写入的 flat dict payload。opts:可选写参数;普通写入通常传None,需要额外写控制时再传PutOptionalArgs(...)。- 返回链路:调用返回成功后,这次写入已经完成,不需要再额外
wait()。
PutOptionalArgs(lease_id: Optional[int] = None)
- 作用:
put_blocking(...)的可选参数对象。 lease_id:提交写入时,把这个 key 绑定到指定 lease。- 常用方式:普通业务写入一般不用传;只有需要 lease 生命周期控制时才显式构造它。
PutOptionalArgs.support_mooncake() -> Tuple[bool, List[str]]
- 作用:检查当前这组写参数是否兼容 mooncake 写入路径。
- 返回值:第一个返回值表示是否兼容,第二个返回值列出不兼容字段名。
get_blocking(key: str) -> Result[MemHolder, ApiError]
- 作用:读取一个 KV。
key:要读取的 KV key。- 返回链路:接口成功后直接拿到
MemHolder,再调用access()取得FlatDict。
MemHolder
- 作用:
get_blocking(...)成功后的读取结果持有者。 - 理解方式:它不是最终的业务 dict,也不是原始 bytes;还要继续
access()。
MemHolder.access() -> Result[FlatDict, ApiError]
- 作用:把
MemHolder中的数据展开成FlatDict。 - 常用用法:
flat = mem.access().unwrap(...),然后再从flat["payload"]之类的字段里取业务值。 - 注意:
MemHolder本身没有bytes();如果 value 里有 bytes 字段,要先access()再取。
get_size(key: str) -> Result[int, ApiError]
- 作用:只查询 value 大小,不把 payload 整体取回。
key:要查询的 KV key。- 适合场景:先判断对象大小,再决定是否继续
get(...)。
is_exist(key: str) -> Result[bool, ApiError]
- 作用:判断某个 KV key 当前是否存在。
key:要检查的 KV key。
remove(key: str) -> Result[OkNone, ApiError]
- 作用:删除一个 KV。
key:要删除的 KV key。
is_exist(key: str) -> Result[bool, ApiError]
- 作用:查询当前 key 是否还存在。
- 最小示例里,
remove(...)之后优先用它验证“删除请求已经生效”。 - 注意:
remove(...)之后立刻get_blocking(...)不保证马上返回KeyNotFoundError;删除后的读路径还会受 owner / master 元数据 cache 清理时序影响。如果你要验证“删除传播后不可读”,需要给删除传播留出观察时间。
节点间 RPC 调用最小示例
这里的 RPC 指节点间 RPC 调用,目标节点通常用目标实例的 instance_key 来标识。
对应示例脚本:examples/rpc_call.py
#!/usr/bin/env python3
import argparse
import signal
from fluxon_py import FluxonKvClientConfig, new_store
RPC_SERVER_INSTANCE_KEY = "demo_rpc_server"
RPC_CLIENT_INSTANCE_KEY = "demo_rpc_client"
CLUSTER_NAME = "demo-kv-cluster"
SHARED_MEMORY_PATH = "/dev/shm/fluxon_kv_demo"
SHARED_FILE_PATH = "/tmp/fluxon_kv_demo/shared"
def main() -> None:
parser = argparse.ArgumentParser(description="Minimal node-to-node RPC example")
subparsers = parser.add_subparsers(dest="command", required=True)
serve_parser = subparsers.add_parser("serve", help="Start one RPC handler process")
serve_parser.add_argument("--instance-key", default=RPC_SERVER_INSTANCE_KEY, help="RPC handler instance key")
call_parser = subparsers.add_parser("call", help="Call one RPC handler and print the counter")
call_parser.add_argument("--instance-key", default=RPC_CLIENT_INSTANCE_KEY, help="RPC caller instance key")
call_parser.add_argument(
"--target-instance-key",
default=RPC_SERVER_INSTANCE_KEY,
help="Target RPC handler instance key",
)
args = parser.parse_args()
if args.command == "serve":
run_server(instance_key=args.instance_key)
return
if args.command == "call":
run_client(instance_key=args.instance_key, target_instance_key=args.target_instance_key)
return
raise AssertionError("unreachable")
def _build_config(*, instance_key: str) -> FluxonKvClientConfig:
return FluxonKvClientConfig(
{
"instance_key": instance_key,
"fluxonkv_spec": {
"cluster_name": CLUSTER_NAME,
"shared_memory_path": SHARED_MEMORY_PATH,
"shared_file_path": SHARED_FILE_PATH,
},
"test_spec_config": {
"disable_observability": True,
},
}
)
def run_server(*, instance_key: str) -> None:
store = new_store(_build_config(instance_key=instance_key)).unwrap("new_store failed")
count = 0
def count_handler(from_node_id: str, payload: dict) -> dict:
nonlocal count
count += 1
print(f"rpc from={from_node_id} payload={payload} count={count}")
return {
"count": count,
"payload": payload["payload"],
}
try:
store.rpc_register("/count", count_handler).unwrap("rpc_register failed")
print(f"[rpc] handler ready instance_key={instance_key}")
print("[rpc] waiting for Ctrl-C")
signal.pause()
except KeyboardInterrupt:
print("[rpc] caught Ctrl-C, stopping handler")
raise SystemExit(130)
finally:
store.close().unwrap("close failed")
def run_client(*, instance_key: str, target_instance_key: str) -> None:
store = new_store(_build_config(instance_key=instance_key)).unwrap("new_store failed")
try:
resp = (
store.rpc_call(target_instance_key, "/count", {"payload": b"hi"})
.unwrap("rpc_call failed")
.wait()
.unwrap("rpc wait failed")
)
print(resp["count"])
finally:
store.close().unwrap("close failed")
if __name__ == "__main__":
main()常用接口(节点间 RPC)
rpc_register(path: str, handler: Callable[[from_node_id: str, payload: FlatDict], FlatDict]) -> Result[OkNone, ApiError]rpc_call(node_id: str, path: str, payload: FlatDict, timeout_ms: int = 10000) -> Result[响应句柄, ApiError]
使用约束如下:
node_id通常对应目标节点的instance_key(见:架构和概念)。timeout_ms默认是10000;如果调用方显式指定,必须满足timeout_ms >= 10000。
配置对象与配置文件
KV 环境至少会直接接触两类配置对象:
- master 配置:启动控制面进程,负责 etcd、成员路由、监控和 master 日志目录
- client / external 配置:创建
FluxonKvClientConfig,供new_store(...)附着到同机 owner 并发起 KV / RPC
业务代码直接编辑的通常是第二类配置对象,并且通常直接编辑 Python dict;需要先把 KV 集群拉起来时,两类配置对象都要准备,此时再把配置落成 YAML 给 CLI / runtime 进程使用。
1) master 配置
最小 master 配置示例:
instance_key: my-master-1
cluster_name: demo-kv-cluster
port: 31000
etcd_endpoints:
- 127.0.0.1:2379
log_dir: /var/lib/fluxon/master_logs理解方式:
etcd_endpoints:master 控制面连接的 etcd 地址log_dir:master 自己的日志 / profile authority;运行时会在这个目录下继续派生 cluster 级日志子目录
2) client / external 配置
业务代码里直接把 Python dict 传给 FluxonKvClientConfig(...) 的构造方式如下:
from fluxon_py import FluxonKvClientConfig
cfg = FluxonKvClientConfig(
{
"instance_key": "my-kv-client-1",
"fluxonkv_spec": {
"cluster_name": "demo-kv-cluster",
"shared_memory_path": "/dev/shm/fluxon",
"shared_file_path": "/var/lib/fluxon/shared",
},
}
)如果配置已经落成 YAML 文件,也可以直接从文件构造:
from fluxon_py import FluxonKvClientConfig
cfg = FluxonKvClientConfig.from_file("./kv_external.yaml")这两种方式最终得到的是同一个配置对象,后续都传给 new_store(cfg)。
最小 external-client 配置示例:
# 当前 Python 进程 / external client 的唯一实例标识
instance_key: my-kv-client-1
fluxonkv_spec:
# 目标集群名;必须和 master / owner 保持一致
cluster_name: demo-kv-cluster
# 本机共享内存 authority;external 靠它附着到同机 owner 的内存池
shared_memory_path: /dev/shm/fluxon
# 本机共享文件 authority;shared.json、日志、profile 等文件位于这里
shared_file_path: /var/lib/fluxon/shared
# 可选:覆盖当前 client 的 P2P 监听端口
p2p_listen_port: 31001Owner 节点需要额外配置内存贡献和 etcd 地址:
# owner 实例标识;同样要求全局唯一
instance_key: my-owner-1
# owner 向集群贡献的内存池大小
contribute_to_cluster_pool_size:
# DRAM 贡献,单位字节
dram: 1677721600
# VRAM 贡献;这里为空表示不贡献显存
vram: {}
fluxonkv_spec:
# owner 连接 etcd 的地址列表
etcd_addresses:
- 127.0.0.1:2379
# 目标集群名;必须和 master / external 保持一致
cluster_name: demo-kv-cluster
# 本机共享内存 authority;external 进程会附着到这里
shared_memory_path: /dev/shm/fluxon
# 本机共享文件 authority;shared.json、日志、profile 等文件位于这里
shared_file_path: /var/lib/fluxon/shared
# owner 自己的 P2P 监听端口
p2p_listen_port: 31000
# owner 所属子集群标签
sub_cluster: default这里需要把两个本机 authority 分清楚:
shared_memory_path:共享内存 / mmap authority,同机进程靠它附着到同一块内存池shared_file_path:共享文件 authority,shared.json、日志、profile 等文件位于这里FLUXON_LOG:用户 Python 进程 console log 的门限,不写时默认INFO
zero-contribution external 模式下有一个硬约束:fluxonkv_spec.etcd_addresses、fluxonkv_spec.sub_cluster、fluxonkv_spec.redis_compat 这类 owner 侧字段不应出现。