持久化数据到硬盘上并采用RocksDB作为存储引擎

在上一篇文章中,我们使用内存做数据的存储。在这一篇文章中,我们持久化数据到硬盘上,采用rocksdb作为存储引擎。同时也增加了一个service层,将命令解析和存储逻辑提取到这一层中。
rocksdb 先在cargo.toml文件中加入rocksdb依赖:
然后,在src/storage目录下创建rocksdb_storage.rs文件。代码如下:
1#[derive(debug)] 2pub struct rocksdbstorage(db); 3 4impl rocksdbstorage { 5    pub fn new(path: impl asref) -> self { 6        self(db::open_default(path).unwrap()) 7    } 8} 910impl storage for rocksdbstorage {11    fn get(&self, key: &str) -> result {12        let v = self.0.get(key)?.unwrap();13        ok(some(v.into()))14    }1516    fn set(&self, key: &str, value: bytes::bytes) -> result {17        self.0.put(key, value.clone()).unwrap();18        ok(some(value))19    }20}  service层 创建src/service目录,然后创建mod.rs文件及cmd_service.rs文件。在mod.rs文件中加入如下代码:1pub mod cmd_service;23pub trait cmdservice {4    // 解析命令,返回response5    fn execute(self, store: &impl storage) -> cmdresponse;6}在cmd_service.rs文件中为命令实现cmdservice trait,代码如下: 1use crate::{cmdresponse, cmdservice, get, set}; 2 3// 为 get 实现 execute 4impl cmdservice for get { 5    fn execute(self, store: &impl crate::storage) -> cmdresponse { 6        // 从存储中获取数据,返回cmdresponse 7        match store.get(&self.key) { 8            ok(some(value)) => value.into(), 9            ok(none) => not found.into(),10            err(e) => e.into(),11        }12    }13}1415// 为 set 实现 execute16impl cmdservice for set {17    // 存储数据18    fn execute(self, store: &impl crate::storage) -> cmdresponse {19        match store.set(&self.key, self.value) {20            ok(some(value)) => value.into(),21            ok(none) => set fail.into(),22            err(e) => e.into(),23        }24    }25}在src/pb/mod.rs中实现从bytes、&str、box转换为cmdresponse: 1impl from for cmdresponse { 2    fn from(v: bytes) -> self { 3        self { 4            status: 200u32, 5            message: success.to_string(), 6            value: v, 7        } 8    } 9}1011impl from for cmdresponse {12    fn from(s: &str) -> self {13        self {14            status: 400u32,15            message: s.to_string(),16            ..default::default()17        }18    }19}2021impl from for cmdresponse {22    fn from(e: box) -> self {23        self {24            status: 500u32,25            message: e.to_string(),26            ..default::default()27        }28    }29}    然后在src/service/mod.rs中加入service代码: 1// 设置默认存储为rocksdb 2pub struct service { 3    store_svc: arc, 4} 5 6// 在多线程中进行clone 7pub struct storeservice { 8    store: store, 9}1011impl service {12    pub fn new(store: store) -> self {13        self {14            store_svc: arc::new(storeservice { store }),15        }16    }1718    // 执行命令19    pub async fn execute(&self, cmd_req: cmdrequest) -> cmdresponse {20        println!(=== execute command before ===);21        let cmd_res = process_cmd(cmd_req, &self.store_svc.store).await;22        println!(=== execute command after ===);23        cmd_res24    }25}2627// 实现clone trait28impl clone for service {29    fn clone(&self) -> self {30        self {31            store_svc: self.store_svc.clone(),32        }33    }34}3536// 处理请求命令,返回response37async fn process_cmd(cmd_req: cmdrequest, store: &impl storage) -> cmdresponse {38    match cmd_req.req_data {39        // 处理 get 命令40        some(reqdata::get(cmd_get)) => cmd_get.execute(store),41        // 处理 set 命令42        some(reqdata::set(cmd_set)) => cmd_set.execute(store),43        _ => invalid command.into(),44    }45}
配置 我们修改配置,在conf/server.conf中加入rocksdb路径......[rocksdb_path]path = '/tmp/kvserver'    在src/config.rs中加入如下代码: 1// server端配置 2#[derive(debug, serialize, deserialize)] 3pub struct serverconfig { 4    ...... 5    pub rocksdb_path: rocksdbpath, 6} 7 8...... 910// rocksdb存储目录11#[derive(debug, serialize, deserialize)]12pub struct rocksdbpath {13    pub path: string,14}修改kv_server 在kv_server.rs中使用service执行命令,删除process_cmd函数: 1#[tokio::main] 2async fn main() -> result { 3    ...... 4 5    // 初始化service及存储 6    let service = service::new(rocksdbstorage::new(rocksdb_path)); 7 8    loop { 9         ......10        let svc = service.clone();1112        tokio::spawn(async move {13            // 使用frame的lengthdelimitedcodec进行编解码操作14            let mut stream = framed::new(stream, lengthdelimitedcodec::new());15            while let some(ok(mut buf)) = stream.next().await {16                ......1718                // 执行请求命令19                let cmd_res = svc.execute(cmd_req).await;2021                ......22            }23            info!(client {:?} disconnected, addr);24        });25    }26}   


POE供电常见7大问题,弄清楚poe不再难
基于FPGA 的高效率多时钟的虚拟直通路由器
计算机通信与网络v2 实验课程(21)
Q2季度爱立信实现净利润同比增长40%,预计中国市场5G合同实现盈利
工信部闻库表示:5G终端距离普及还需要一段时间
持久化数据到硬盘上并采用RocksDB作为存储引擎
亨通发布了5G光纤光缆等一系列高端新品及系统解决方案
频谱分析仪分辨率带宽和视频带宽的联系和区别
!销售/收购/维修HP8647A信号源HP 8647A现货!
第三季度DRAM价格再下修 跌幅恐扩大至15%
dfrobotArduino 树莓派扩展板简介
国家电网在硅谷推出了其新成立的风险投资公司
智能音箱并不是未来的发展趋势,未来真正的家庭助手应该是智能机器人
从PT展直播中,认识一家新的5G领军企业
是德科技N6700系列模块电源的类型及特性
一个简单的LED调光电路分享
荣耀9什么时候上市?荣耀9外观撞车小米6,麒麟960+6G运存,仅2000元
都被骗了?经由日本深挖后,声称发现麒麟9000S芯片的秘密
IP67心率计防水检测操作的具体流程是怎样的
工厂能源管理系统方案助企业实现高效、环保与可持续生产