本文介绍: 在设计分布式程序时,可供选择通信方式主要有两种:异步通信同步通信采用异步通信时,发送方无须等待任何确认或应答。而在采用同步通信时,发送方会处于挂起状态,直至收到回复为止((即便只是“收到,多谢”之类的确认回复)。Erlang消息传递基本形式就是异步的,因为这种形式最为简单灵活:一般来说异步通信更适合分布式编程,而且同步通信可以利用成对的异步请求/响应消息模拟(就是这样做的)。

一:分布式缓存

1.选取通信策略

设计分布式程序时,可供选择的通信方式主要有两种:异步通信和同步通信。采用异步通信时,发送方无须等待任何确认或应答。而在采用同步通信时,发送方会处于挂起状态,直至收到回复为止((即便只是“收到,多谢”之类的确认性回复)。Erlang消息传递基本形式就是异步的,因为这种形式最为简单灵活:一般来说异步通信更适合分布式编程,而且同步通信总可以利用成对的异步请求/响应消息来模拟(gen_server:call/3就是这样做的)。

(1)异步通信:

异步通信有时也被称为“即发即忘”(fire and forget)或“即发即盼”(send and pray)式通信。消息一上路,发送方便撒手不管,继续干活。如果预期远端进程应该给出应答,发送方随后会伺机检查应答消息,如下图所示。一般来说,能否在指定时间内收到应答并不影响发送
后续的工作,至少部分工作不会受到影响

 

 即发即忘的异步通信:消息一发完,发送方便撒手不管,继续干活。应答消息将被单独发回。

异步通信的开销很低,是计算机系统间一种良好的基本通信形式。由于省去了各种检查扫描验证、计时等杂务,异步通信非常之快,尤其适用于创建简单而直观的系统我们建议是,除非万不得已,否则请尽量采用异步通信。 

(2)同步通信:

在同步通信中,每条消息都需要一个应答(哪怕只是一条用于确认消息送达的回执)。在收应答之前发送方会被挂起,什么也做不了。由于发送方在等待应答的过程中处于阻塞状态,这通信策略又被称做阻塞式通信。典型的同步信息交换过程下图所示:

 同步阻塞式通信:在收到应答之前,发送方会被挂起。即使发送方的后续工作并不严格依赖于该应答,也会被迫中断中断时长不短于消息往返一个来回所需的时间

同步通信最显著的缺陷就是收到响应之前发送方什么都做不了(在分布式环境下,这段时间至少是网络两台计算机之间消息传递时延的两倍)。另一方面,它的优势也很明显,那就是可以轻易地让系统在某一活动中保持同步。

2. 同步缓存和异步缓存

(1)同步缓存:同步模式提供的一致性保障与异步模式有所不同。在这种模式下,只有在所有缓存实例都拿到会话数据之后,系统才会告知用户登录成功。也就是说执行插入操作函数必须先收到所有缓存实例的确认消息才能进行一步动作如图

(2)异步缓存:不保证插入操作完成系统状态的一致性并不意味着插人操作会频繁出错或是执行速度很慢一只是无法得到百分之百的一致性保障罢了。总体来看,服务整体状态有可能出现临时的不一致,而且在某个较低的概率用户可能感知到这种不一致。如图所示:

但总的来说,其实就是同步缓存能保持一致,却得等全部完成才能进行一步,而异步缓存虽有可能出现缓存不是百分百一致,但却可以完成各的。

 3.分布式表

在分布式模式中,每一张表都需要进行映射,因为不同模块上的表内数据需要同步,这样才能进行不同服务器直接实现访问调用,主要映射关系下图所示:

二: 用Mnesia实现分布式数据存储

Mnesia是一套轻量级的软实时分布式数据存储系统支持冗余复制事务,特别适合于存储离散的Erlang数据块,尤其擅长RAM中的数据存储。Mnesia天生支持Erlang,Erlang?数据无须任何格式转换便可原封不动地存人其中。有鉴于此,它自然就成为缓存应用首选的数据库方案

1.建立项目数据库

在Mnesia中,表项可由普通Erlang记录定义我们可以用下面代码罗列:

%% 项目数据库记录定义
-record(user, {id, name}).
-record(project, {title, description}).
-record(contributor, {user_id, title}).

建立数据库主要分为以下几个步骤

  1. 初始化 Mnesia
  2. 启动节点
  3. 建立数据库模式
  4. 启动 Mnesia
  5. 建立数据库
  6. 新建表中录入数据
  7. 对数据做一些基本查询

 2初始化数据库

(1)启动节点:在使用Mnesial时,请按如下方式启动Erlang节点:

erl -mnesia dir '"/tmp/mnesia_store"'-name mynode

(2) 建立数据库模式:只需要本地节点上建立数据库模式即可

(ming@erlware.org)1>mnesia:create_schema ([node()]).

(3) 启动Mnesia:

调用mnesia:start()便可手动启动Mnesia。Mnesia运行起来之后,可以调用mnesia:info()来核实数据库的基本信息,如数据库中现存多少张表,当前多少个节点相连等:

(ming@derlware.org)2>mnesia:start ()
ok
(ming@erlware.org)3>mnesia:info().
---> Processes holding locks <---
---> Processes waiting for locks <---
---> Participant transactions <---
---> Coordinator transactions <---
---> Uncertain transactions <---
---> Active tables <---
schema    : with 1    records occupying 422    words of mem
===> System info in version "4.4.8", debug level none <===
opt_disc. Directory "/tmp/mnesia"is used.
use fallback at restart = false
running db nodes   = [mynode@erlware.org]
stopped db node s  = []
master node tables = []
remote             = []
ram_copies         = []
disc_copies        = [schema]
disc_only_copies   = []
[{ming@erlware.org,disc_copies}] = [schema]
2 transactions committed,0 aborted,0 restarted,0 logged to disc
0 held locks,0 in queue;0 local transactions,0 remote
0 transactions waits for other nodes: []
ok

用这种方法可以很方便地核实线上系统配置例如各节点间的全连通状况及一切是否配置
完好等。现在数据库系统已经初始化完毕,可以开始编写应用代码了,第一步建表

3.建表

建表操作完全可以直接在Erlang shell进行,但由于shell记录支持有限,这样做会有点儿别扭。可以用下面代码所示:

mnesia:create_table(Name, Options).
 
-record(user, {id, name}).
mnesia:create_table(user, [{attributes, record_info(fields, user)}, {type, bag}]).
 
mnesia:write(#user{id=Id, name=Name}).
mnasia:read(user, Id).
mnesia:transaction(Fun).
mnesia:dirty_write(#user{id=Id, name=Name}).
 
 
%% record_info/2 不是真正意义上的函数,它只在编译器有效(和记录语法中的#一样),在运行期或在 Erlang shell 中无法调用它。
 
其他的默认选项:
1、表既可读也可写
2、表仅驻留于 RAM 中
3、表中存储记录与表同名
4、表的类型set
5、加载优先级为0
6、local_content 标记被置为 false
 
Mnesia 表类型:
1、set
2、ordered_set
3、bag
 
Mnesia 存储类型:
1、ram_copies
2、disc_copies
3、disc_only_copies    % 不支持ordered_set
 
不同节点上的表可以有不同的存储类型,甚至支持运行时修改

Options 是一张 {Name, Value} 选项列表,在所有选项之中,最重要的一个attributes,该选项用于指定表中所存记录的字段名。如果没有它,Mnesia 会假定记录中仅有两个字段,分别为 keyval。表的主键永远都是记录的第一个字段

不过,为了更好理解我们还是打算编写一个模块完成这项工作代码如下

%% Mnesia建表模块
-record(user, {id, name}).
-record(project, {title, description}).
-record(contributor, {user_id, title}).

init_tables()->
    mnesia:create_table(user, [{attributes, record_info(fields, user)}]),
    mnesia:create_table(project, [(attributes, record_info(fields,project)}]),
    mnesia:create_table(contributor,                        
                        [{type, bag}, {attributes, record_info(fields, contributor)}]).

4.向表中录入数据

其他人在插入数据时是没有必要了解表的详情的,这些细节应该由API函数隐藏起来。添加API函数的同时也多出了一个校验机会,你可以在插人数据之前对数据进行一些一致性检查比如说,新添加的用户至少要参与一个项目,并且不允许将用户加为尚不存在项目的参与人。添加用户和项目的代码如下所示:

%% 数据插人函数
insert_user(Id, Name, ProjectTitles) when ProjectTitles =/= [] ->
    User = #user(idId,nameName},
    Fun = fun() ->
        %% 向表中写入用户记录
        mnesia:write(User),
        lists:foreach(
            fun(Title) ->
                [#project(title = Title)] = mnesia:read(project, Title),
                %% 插入参与人记录
                mnesia:write(#contributor{user_id = Id, project_title = Title})
            end,
            ProjectTitles)

    end,
    mnesia:transaction(Fun).

%% 设置事务
insert_project(Title, Description) ->
    mnesia:dirty_write(#project{title = Title, description = Description}).

5.查询

QLC是一套通用查询接口,适用于ETS表、Mnesia表等各种具有表的特征的东西。通过实现相应的QLC适配器,甚至可以将QLC用在自定义的表结构上。在使用QLC之前,首先要用
mnesia:table(TableName)函数建立一个Mnesia表句柄,该句柄将被用作QLC的输人参数然后,就可以用普通的列表速构语法来实现各种过滤聚合操作了。例如我们可以这样做:

mnesia:select(user, [{#user{id='$1', name=zh}, [], ['$1']}]).
 
{Head, Condition, Results}
Condition 罗列作用于该匹配条件上的额外约束条件
Result 描述要从匹配到的每条记录中生成什么样的结果项式
 
'_'    仅限于在 Head 部分使用,无所谓,任意值都可以
'$_'   仅限于在 Result 和 Condition使用,与查询条件匹配的整条记录
'$$'   仅限于在 Result 和 Condition使用,等价于依此罗列出在 Head 部分匹配所有变量

具体操作列如下面所示:

mnesia:transaction(
    fun() ->
        Table = mnesia:table(user),
        QueryHandle = qlc:q([U#user.id || U <- Table, U#user.name =:= martin]),
        qlc:eval(QueryHandle)
    end)

相较于select匹配规范,QLC是一套更为优雅的查询接口。就可读性而言上述代码比起之前的版本要清晰得多,代码的目的一目了然:首先从Mnesia的user表中找出所有U#user,name等于martin的用户记录;然后从其中的每个记录u中取U#user.id,形成最终的结果列表

三:基于Mnesia的分布式缓存

学习了Mnesia的基础知识,现在总算可以开始开发了。要想让设计中的缓存正常运转,还有以下工作要做:

(1)用Mnesial取代ETS;
(2)让缓存能够识别出其他节点,从而进行必要的通信;
(3)让缓存具备资源探测能力
(4)动态复制Mnesia表。

1.用Mnesia取代ETS

我们可以编写以下模块

 sc_store模块关键函数有四个:
(1)init/0
 (2)insert/2
(3)lookup/1
(4)delete/1 

首先是用于设置ETS的init/0,现在你得用它来设置Mnesia表。我们后续还要进一步改造该函数,以便封装冗余复制相关逻辑;这些暂且放在一边,先把表建好再说。

(1)编写init/0:

init()->
    mnesia:start(),
    mesia:create_table(key_to_pid,
                        [{index,[pid]},
                         {attributes, record_info(fields, key_to_pid)}]).

注:这是一张驻留在RAM中的普通set型表,不可重复

(2) 编写insert/2:

insert(Key,Pid) ->
    mnesia:dirty_write(#key_to_pid{key = Key, pid = Pid}).

注:在对表进行更新时我们用的是dirty._write.。在这里Mnesia只负责最简单的键值存储,根本用不到事务。之所以选用Mnesia,主要是考虑到它的冗余复制功能

(3)编写lookup/1:

lookup(Key)->
    case mnesia:dirty_read(key_to_pid, Key) of
        [{key_to_pid,Key,Pid)] -> {ok, Pid};
                            [] -> {error, not_found}
    end.

注:由于是set型表,结果中最多只有一条记录,用dirty_read就够了。

(4)编写delete/1:

delete(Pid) ->
    %% 按pid查询表项
    case mnesia:dirty_index_read(key_to_pid,Pid, #key_to_pid.pid) of
            [#key_to_pid{} = Record] -> 
                mnesia:dirty_delete_object(Record);
            %% 查询出错也应返回ok
            _ ->

                ok
    end.

注:在执行删除操作时可能会出现两种情况:要么键原本就不存在可能已经被删掉了),要么
存在并被成功删除。无论是哪种情况,都应该返回σk。

2.让缓存识别出其他节点

在该方案下新节点将通过两个长期运行的空白Erlang节点来加人预先约定的集群。这两个节点不执行任何用户代码(因而几乎永远不会宕机)。你只需要启动它们,给它们分配恰当的节点名,并设置好用集群认证cookie就可以了:

erl -name contactl -setcookie xxxxxxxx
erl -name contact2 -setcookie xxxxxxxx

新启动的缓存节点将按事先配置的节点名pig这两个节点,如图所示:

 3.用资源探测定位其他缓存实例

需要建立应用目录结构,并编写相应的.app文件、_app模块,和sup模块(用于启动资源探测服务器)。全部搞定之后,应该得到两个目录结构并列应用如下所示:

lib
  |- simple_cache
         |- src
         |- ebin
         |- resource_discovery
         |- src
         |- ebin
         |- ...

此外,还需要给simple_cache增加两个依赖项,即resource_discoverymnesia.代码如下:

%% 编写后的simple_cache.app文件
{application,simple_cache,
 [{description,"A simple caching system"},
  {vsn, "0.3.0"},

  {modules, [simple_cache,
             sc_app,
             sc_sup,
             sc_element_sup,
             sc_store,
             sc_element,
             sc_event,
             sc_event_logger]},
  {registered, [sc_sup]},
  %% 加入了新的依赖项
  {applications, [kernel,sasl,stdlib,mnesia,resource_discovery]),
  {mod, {sc_app,[]}}
]}.

最后一步就是向集群中的其余节点发起资源交换请求,接着耐心等待一段时间,直至资源信息交换完毕(这套资源探测系统具有较强的异步性,因而等待是必要的):

resource_discovery:trade_resources(),
timer:sleep (?WAIT_FOR_RESOURCES),

很明显,这段代码也应该加到sc_app:start/2函数中位置紧随ensure_contact()调用
之后。代码如下:

%% 资源探测相关修改(sc_app.erl)
-define(WAIT_FOR_RESOURCES,2500).

start(_StartType, _StartArgs) ->
    ok = ensure_contact(),
    resource_discovery:add_local_resource(simple_cache,node()),
    resource_discovery:add_target_resource_type(simple_cache),
    resource_discovery:trade_resources(),
    timer:sleep (?WAIT_FOR_RESOURCES),
    sc_store:init(),
    case sc_sup:start_link()of
        {ok, Pid} ->
            {ok, Pid};
        Error ->
            Error
    end.

4.动态复制Mnesia表

前面初始化节点什么的就不再过多描述了,直接上代码:

%% 连接其他Mnesia节点并
-define(WAIT_FOR_TABLES,5000).

add_extra_nodes([Node T]) ->
    case mnesia:change_config(extra_db_nodes,[Node])of
        {ok,[Node]} ->
            %% 用远程数据库的模式替换本地模式
            mnesia:add_table_copy(schema,node(), ram_copies),
            mnesia:add_table_copy(key_to_pid,node(), ram_copies),
            Tables = mnesia:system_info(tables),
            mnesia:wait_for_tables(Tables, ?WAIT_FOR_TABLES);
        _ ->
            %% 继续尝试其他节点
            add_extra_nodes(T)
    end.

这样就已经实现了用Mnesia实现分布式数据存储测试时记得启动几个它所依赖的应用

1> application:start(sasl).
ok

2> mnesia:start().
ok

3> application:start(resource_discovery).
ok

4> application:start(simple_cache).
ok

原文地址:https://blog.csdn.net/qq_53795917/article/details/134773050

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_50453.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注