Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:25
cpg
cpg-1.8.0-git.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File cpg-1.8.0-git.patch of Package cpg
diff --git a/LICENSE b/LICENSE index 6007025..c7f5aa5 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2011-2019 Michael Truog <mjtruog at protonmail dot com> +Copyright (c) 2011-2020 Michael Truog <mjtruog at protonmail dot com> Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), diff --git a/README.md b/README.md index 5842582..7497af6 100644 --- a/README.md +++ b/README.md @@ -4,45 +4,36 @@ ## Purpose -CPG provides a process group interface that is similar to the pg2 module -within Erlang OTP. The pg2 module is used internally by -Erlang/OTP, and is currently the most common approach to the combination of -availability and partition tolerance in Erlang (as they relate to the -CAP theorem). When comparing these goals with gproc (and its usage of -`gen_leader`), gproc is focused on availability and consistency (as it relates -to the CAP theorem), which makes its goals similar to mnesia. - -The cpg interface was created to avoid some problems with pg2 while pursuing -better availability and partition tolerance. pg2 utilizes ets (global -key/value storage in Erlang which requires internal memory locking, -which limits scalability) but cpg uses internal process memory -(see the [Design](#design) section for more information). By default, -cpg utilizes Erlang strings for group names (list of integers) and provides -the ability to set a pattern string as a group name. A pattern string -is a string that includes the `"*"` or `"?"` wildcard characters -(equivalent to a ".+" regex while `"**"`, `"??"`, `"*?"`, and `"?*"` - are forbidden). When a group name is a pattern string, a process can be -retrieved by matching the pattern. To change the behavior to be compatible -with pg2 usage (or gproc), see the [Usage](#usage) section below. - -The cpg interface provides more error checking than the pg2 module, and it -allows the user to obtain the groups state so that group name lookups do not -require a message to the cpg scope process. The cpg scope is a locally -registered process name used to provide all the group names with a scope. -By avoiding a message to the cpg scope process, contention for the single -process message queue can be avoided. - -The process group solutions for Erlang discussed here depend on -the distributed Erlang functionality, provided natively by Erlang/OTP. -The distributed Erlang functionality automatically creates a fully-connected -network topology and is only meant for a Local Area Network (LAN). -Since a fully-connected network topology is created that requires a -net tick time average of 60 seconds (the net tick time is not increased -to ensure distributed Erlang nodes fail-fast) the distributed -Erlang node connections are limited to roughly 50-100 nodes. So, that -means these process group solutions are only targeting a cluster of Erlang -nodes, given the constraints of distributed Erlang and a fully-connected -network topology. +cpg provides a process group interface that is focused on +availability and partition tolerance (in the CAP theorem). +The pg process group implementation added in Erlang/OTP 23 by +WhatsApp Inc. (Facebook Inc.) is based on cpg. +The cpg interface is compatible with pg2 +(scheduled for removal in Erlang/OTP 24). + +## Features (Compare and Contrast) + +### cpg + +* By default, cpg utilizes Erlang strings for group names (list of integers) and provides the ability to set a pattern string as a group name. A pattern string is a string that includes the `"*"` or `"?"` wildcard characters (equivalent to a ".+" regex while `"**"`, `"??"`, `"*?"`, and `"?*"` are forbidden). When a group name is a pattern string, a process can be retrieved by matching the pattern (more information at the [CloudI FAQ](https://cloudi.org/faq.html#4_URLregex)). To not use this approach for group names, refer to the [Usage](#usage) section below. +* cpg provides its internal state for usage in separate Erlang processes as cached data with the `cpg_data` module. That approach is more efficient than usage of ets. +* Each cpg scope is an atom used as a locally registered process name for the cpg scope Erlang process. Separate cpg scopes may be used to keep group memberships entirely separate. +* cpg data lookups are done based on the Erlang process being local or remote, or the relative age of the local membership to the group, or with random selection (using the terminology `closest`, `furthest`, `random`, `local`, `remote`, `oldest`, `newest`). `closest` prefers local processes if they are present while `furthest` prefers remote processes if they are present. The `oldest` process in a group is naturally the most stable process. +* cpg provides an interface for `via` process registry use (examples are provided in the [tests](https://github.com/okeuday/cpg/blob/master/test/cpg_tests.erl)). + +### pg (>= Erlang/OTP 23) (https://github.com/max-au/spg) + +* pg uses one monitor per remote node (it takes longer to update a group after an Erlang process dies and may never remove remote group members). +* pg uses ets while cpg does not (cpg instead provides cached data for more efficient access to the process group data). + +### pg2 (=< Erlang/OTP 24) + +* pg2 uses global:trans/2 which is unable to handle network or node failures. +* pg2 uses ets while cpg does not (cpg instead provides cached data for more efficient access to the process group data). + +### gproc / syn + +* Both are focused on consistency with leader election and are unable to be available when suffering network or node failures. Failures can cause unpredictable conflict resolution, in an attempt to achieve consistency. ## Design @@ -92,10 +83,10 @@ described above. ## Usage -If you need non-string (not a list of integers) group names -(e.g., when replacing gproc), you can change the cpg application -`group_storage` env setting by providing a module name that provides a -dict module interface (or just set to `dict`). +If you need non-string (not a list of integers) group names, +set the cpg application `group_storage` env value to a module name that +provides a dict module interface +(e.g., use `dict` or [`mapsd`](https://github.com/okeuday/mapsd)). ## Example diff --git a/src/cpg.erl b/src/cpg.erl index 664526a..0159a6f 100644 --- a/src/cpg.erl +++ b/src/cpg.erl @@ -4,8 +4,6 @@ %%%------------------------------------------------------------------------ %%% @doc %%% ==CloudI Process Groups (CPG)== -%%% Based on the pg2 module in the Erlang OTP kernel application -%%% (lib/kernel-x.x.x/src/pg2.erl). %%% cpg relies on distributed Erlang for node communication, which means %%% a fully connected network topology is created. With Distributed Erlang, %%% Erlang pids either exist on the local node or a remote node @@ -13,28 +11,29 @@ %%% so only 1 node hop is necessary in the worst case). %%% @end %%% -%%% The pg2 module copyright is below: -%%% %CopyrightBegin% +%%% Copyright (c) 2011-2020 Michael Truog <mjtruog at protonmail dot com> %%% -%%% Copyright Ericsson AB 1997-2013. All Rights Reserved. +%%% Permission is hereby granted, free of charge, to any person obtaining a +%%% copy of this software and associated documentation files (the "Software"), +%%% to deal in the Software without restriction, including without limitation +%%% the rights to use, copy, modify, merge, publish, distribute, sublicense, +%%% and/or sell copies of the Software, and to permit persons to whom the +%%% Software is furnished to do so, subject to the following conditions: %%% -%%% Licensed under the Apache License, Version 2.0 (the "License"); -%%% you may not use this file except in compliance with the License. -%%% You may obtain a copy of the License at +%%% The above copyright notice and this permission notice shall be included in +%%% all copies or substantial portions of the Software. %%% -%%% http://www.apache.org/licenses/LICENSE-2.0 -%%% -%%% Unless required by applicable law or agreed to in writing, software -%%% distributed under the License is distributed on an "AS IS" BASIS, -%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%%% See the License for the specific language governing permissions and -%%% limitations under the License. -%%% -%%% %CopyrightEnd% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +%%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +%%% DEALINGS IN THE SOFTWARE. %%% %%% @author Michael Truog <mjtruog at protonmail dot com> -%%% @copyright 2011-2018 Michael Truog -%%% @version 1.7.5 {@date} {@time} +%%% @copyright 2011-2020 Michael Truog +%%% @version 1.8.1 {@date} {@time} %%%------------------------------------------------------------------------ -module(cpg). @@ -176,8 +175,7 @@ {name(), pos_integer()} | name(). % for OTP behaviors -type reason_join() :: join_local | - join_remote | - {exit, any()}. + join_remote. -type reason_leave() :: leave_local | leave_remote | {exit, any()}. @@ -2666,20 +2664,9 @@ remove_leave_callback(Scope, GroupName, F) init([Scope]) -> Listen = cpg_app:listen_type(), - monitor_nodes(true, Listen), - Nodes = if - Listen =:= visible -> - nodes(); - Listen =:= all -> - nodes(connected) - end, - lists:foreach(fun(Node) -> - {Scope, Node} ! {new, node()} - % data is not persistent in ets, so trust the - % Groups coming from other nodes if this server - % has restarted and wants previous state - end, Nodes), - quickrand:seed(), + ok = monitor_nodes(true, Listen), + ok = gather_groups(listen_nodes(Listen), Scope), + ok = quickrand:seed(), {ok, #state{scope = Scope, groups = cpg_data:get_empty_groups(), listen = Listen}}. @@ -2711,13 +2698,13 @@ handle_call({leave, Pid} = Request, _, #state{monitors = Monitors} = State) -> case maps:take(Pid, Monitors) of {#state_monitor{monitor = MonitorRef, - names = GroupNameList}, NewMonitors} -> + names = GroupNameList}, MonitorsNew} -> true = is_reference(MonitorRef), true = erlang:demonitor(MonitorRef, [flush]), abcast_hidden_nodes(Request, State), {reply, ok, leave_all_local(GroupNameList, Pid, leave_local, - State#state{monitors = NewMonitors})}; + State#state{monitors = MonitorsNew})}; error -> {reply, error, State} end; @@ -2957,12 +2944,12 @@ handle_cast({leave, Pid}, #state{monitors = Monitors} = State) -> case maps:take(Pid, Monitors) of {#state_monitor{monitor = MonitorProcess, - names = GroupNameList}, NewMonitors} -> + names = GroupNameList}, MonitorsNew} -> true = is_pid(MonitorProcess), ok = cpg_node_monitor:remove(MonitorProcess, Pid), {noreply, leave_all_remote(GroupNameList, Pid, leave_remote, - State#state{monitors = NewMonitors})}; + State#state{monitors = MonitorsNew})}; error -> {noreply, State} end; @@ -3006,36 +2993,31 @@ handle_cast({leave_counts, Counts, Pid}, end; handle_cast(reset, - #state{listen = OldListen} = State) -> - Listen = cpg_app:listen_type(), - if - Listen /= OldListen -> - monitor_nodes(true, Listen), - monitor_nodes(false, OldListen); - true -> - ok - end, - {noreply, State#state{listen = Listen}}; + #state{scope = Scope, + listen = ListenOld} = State) -> + ListenNew = cpg_app:listen_type(), + ok = listen_reset(ListenNew, ListenOld, Scope), + {noreply, State#state{listen = ListenNew}}; handle_cast({add_join_callback, GroupName, F}, #state{callbacks = Callbacks} = State) -> - NewCallbacks = cpg_callbacks:add_join(Callbacks, GroupName, F), - {noreply, State#state{callbacks = NewCallbacks}}; + CallbacksNew = cpg_callbacks:add_join(Callbacks, GroupName, F), + {noreply, State#state{callbacks = CallbacksNew}}; handle_cast({add_leave_callback, GroupName, F}, #state{callbacks = Callbacks} = State) -> - NewCallbacks = cpg_callbacks:add_leave(Callbacks, GroupName, F), - {noreply, State#state{callbacks = NewCallbacks}}; + CallbacksNew = cpg_callbacks:add_leave(Callbacks, GroupName, F), + {noreply, State#state{callbacks = CallbacksNew}}; handle_cast({remove_join_callback, GroupName, F}, #state{callbacks = Callbacks} = State) -> - NewCallbacks = cpg_callbacks:remove_join(Callbacks, GroupName, F), - {noreply, State#state{callbacks = NewCallbacks}}; + CallbacksNew = cpg_callbacks:remove_join(Callbacks, GroupName, F), + {noreply, State#state{callbacks = CallbacksNew}}; handle_cast({remove_leave_callback, GroupName, F}, #state{callbacks = Callbacks} = State) -> - NewCallbacks = cpg_callbacks:remove_leave(Callbacks, GroupName, F), - {noreply, State#state{callbacks = NewCallbacks}}; + CallbacksNew = cpg_callbacks:remove_leave(Callbacks, GroupName, F), + {noreply, State#state{callbacks = CallbacksNew}}; handle_cast(Request, State) -> {stop, lists:flatten(io_lib:format("Unknown cast \"~w\"", [Request])), @@ -3061,11 +3043,11 @@ handle_info({nodedown, Node, InfoList}, {ok, Process} -> NodeDownReason = {_, _} = lists:keyfind(nodedown_reason, 1, InfoList), - Pids = cpg_node_monitor:died(Process), - NewNodeMonitors = maps:remove(Node, NodeMonitors), + {Pids, PidReasonsL} = cpg_node_monitor:died(Process), + NodeMonitorsNew = maps:remove(Node, NodeMonitors), {noreply, - node_died(Pids, {exit, NodeDownReason}, - State#state{node_monitors = NewNodeMonitors})}; + node_died(Pids, {exit, NodeDownReason}, PidReasonsL, + State#state{node_monitors = NodeMonitorsNew})}; error -> {noreply, State} end; @@ -3110,6 +3092,16 @@ code_change(_, State, _) -> monitor_nodes(Flag, Listen) -> net_kernel:monitor_nodes(Flag, [{node_type, Listen}, nodedown_reason]). +gather_groups([], _, _) -> + ok; +gather_groups([RemoteNode | RemoteNodes], Node, Scope) -> + % request data to merge for current groups state + {Scope, RemoteNode} ! {new, Node}, + gather_groups(RemoteNodes, Node, Scope). + +gather_groups(RemoteNodes, Scope) -> + gather_groups(RemoteNodes, node(), Scope). + abcast_hidden_nodes(_, #state{listen = visible}) -> ok; abcast_hidden_nodes(Request, #state{scope = Scope, @@ -3121,144 +3113,178 @@ abcast_hidden_nodes(Request, #state{scope = Scope, gen_server:abcast(HiddenNodes, Scope, Request) end. +listen_nodes(visible) -> + nodes(visible); +listen_nodes(all) -> + nodes(connected). + +listen_reset(Listen, Listen, _) -> + ok; +listen_reset(ListenNew, ListenOld, Scope) -> + ok = monitor_nodes(true, ListenNew), + HiddenNodes = nodes(hidden), + ok = monitor_nodes(false, ListenOld), + if + ListenNew =:= all -> + visible = ListenOld, + ok = listen_reset_all(HiddenNodes, Scope); + ListenNew =:= visible -> + all = ListenOld, + ok = listen_reset_visible(HiddenNodes, Scope) + end, + ok. + +listen_reset_all(HiddenNodes, Scope) -> + gather_groups(HiddenNodes, Scope). + +listen_reset_visible([], _, _) -> + ok; +listen_reset_visible([HiddenNode | HiddenNodes], HiddenNodeInfo, Scope) -> + Scope ! {nodedown, HiddenNode, HiddenNodeInfo}, + listen_reset_visible(HiddenNodes, HiddenNodeInfo, Scope). + +listen_reset_visible(HiddenNodes, Scope) -> + HiddenNodeInfo = [{nodedown_reason, cpg_reset}, {node_type, hidden}], + listen_reset_visible(HiddenNodes, HiddenNodeInfo, Scope). + join_group_local(Count, GroupName, Pid, - #state{groups = {DictI, OldGroupsData}, - monitors = OldMonitors, + #state{groups = {DictI, GroupsDataOld}, + monitors = MonitorsOld, callbacks = Callbacks} = State) -> Reason = join_local, cpg_callbacks:notify_join(Callbacks, GroupName, Pid, Reason), - PidList = lists:duplicate(Count, Pid), - GroupData = case DictI:find(GroupName, OldGroupsData) of + GroupData = case DictI:find(GroupName, GroupsDataOld) of {ok, #cpg_data{local_count = LocalI, local = Local, - history = History} = OldGroupData} -> - OldGroupData#cpg_data{local_count = LocalI + Count, - local = PidList ++ Local, - history = PidList ++ History}; + history = History} = GroupDataOld} -> + GroupDataOld#cpg_data{local_count = LocalI + Count, + local = prepend(Pid, Count, Local), + history = prepend(Pid, Count, History)}; error -> + PidList = lists:duplicate(Count, Pid), #cpg_data{local_count = Count, local = PidList, history = PidList} end, - GroupsData = DictI:store(GroupName, GroupData, OldGroupsData), + GroupsData = DictI:store(GroupName, GroupData, GroupsDataOld), State#state{groups = {DictI, GroupsData}, - monitors = monitor_local(Pid, GroupName, OldMonitors)}. + monitors = monitor_local(Pid, GroupName, MonitorsOld)}. join_group_remote(Count, GroupName, Pid, - #state{groups = {DictI, OldGroupsData}, - monitors = OldMonitors, - node_monitors = OldNodeMonitors, + #state{groups = {DictI, GroupsDataOld}, + monitors = MonitorsOld, + node_monitors = NodeMonitorsOld, callbacks = Callbacks} = State) -> Reason = join_remote, cpg_callbacks:notify_join(Callbacks, GroupName, Pid, Reason), - PidList = lists:duplicate(Count, Pid), - GroupData = case DictI:find(GroupName, OldGroupsData) of + GroupData = case DictI:find(GroupName, GroupsDataOld) of {ok, #cpg_data{remote_count = RemoteI, remote = Remote, - history = History} = OldGroupData} -> - OldGroupData#cpg_data{remote_count = RemoteI + Count, - remote = PidList ++ Remote, - history = PidList ++ History}; + history = History} = GroupDataOld} -> + GroupDataOld#cpg_data{remote_count = RemoteI + Count, + remote = prepend(Pid, Count, Remote), + history = prepend(Pid, Count, History)}; error -> + PidList = lists:duplicate(Count, Pid), #cpg_data{remote_count = Count, remote = PidList, history = PidList} end, - GroupsData = DictI:store(GroupName, GroupData, OldGroupsData), + GroupsData = DictI:store(GroupName, GroupData, GroupsDataOld), {Monitors, - NodeMonitors} = monitor_remote(Pid, GroupName, - OldMonitors, OldNodeMonitors), + NodeMonitors} = monitor_remote(Pid, node(Pid), GroupName, + MonitorsOld, NodeMonitorsOld), State#state{groups = {DictI, GroupsData}, monitors = Monitors, node_monitors = NodeMonitors}. leave_group_local(Count, GroupName, Pid, - #state{groups = {DictI, OldGroupsData}, - monitors = OldMonitors, + #state{groups = {DictI, GroupsDataOld}, + monitors = MonitorsOld, callbacks = Callbacks} = State) -> Reason = leave_local, - OldGroupData = DictI:fetch(GroupName, OldGroupsData), + GroupDataOld = DictI:fetch(GroupName, GroupsDataOld), #cpg_data{local_count = LocalI, - local = OldLocal, - history = OldHistory} = OldGroupData, - {I, Local} = leave_group_pid(Count, OldLocal, Pid), - History = leave_group_pid_count(I, OldHistory, Pid), + local = LocalOld, + history = HistoryOld} = GroupDataOld, + {I, Local} = leave_group_pid(Count, LocalOld, Pid), + History = leave_group_pid_count(I, HistoryOld, Pid), cpg_callbacks:notify_leave(Callbacks, GroupName, Pid, Reason, I), {Member, GroupsData} = if History == [] -> {false, - DictI:erase(GroupName, OldGroupsData)}; + DictI:erase(GroupName, GroupsDataOld)}; true -> {lists:member(Pid, Local), DictI:store(GroupName, - OldGroupData#cpg_data{local_count = LocalI - I, + GroupDataOld#cpg_data{local_count = LocalI - I, local = Local, history = History}, - OldGroupsData)} + GroupsDataOld)} end, Monitors = if Member =:= true -> - OldMonitors; + MonitorsOld; Member =:= false -> - case maps:get(Pid, OldMonitors) of + case maps:get(Pid, MonitorsOld) of #state_monitor{monitor = MonitorRef, names = [GroupName]} -> true = is_reference(MonitorRef), true = erlang:demonitor(MonitorRef, [flush]), - maps:remove(Pid, OldMonitors); - #state_monitor{names = OldGroupNameList} = OldStateMonitor -> - GroupNameList = lists:delete(GroupName, OldGroupNameList), + maps:remove(Pid, MonitorsOld); + #state_monitor{names = GroupNameListOld} = StateMonitorOld -> + GroupNameList = lists:delete(GroupName, GroupNameListOld), maps:put(Pid, - OldStateMonitor#state_monitor{ + StateMonitorOld#state_monitor{ names = GroupNameList}, - OldMonitors) + MonitorsOld) end end, State#state{groups = {DictI, GroupsData}, monitors = Monitors}. leave_group_remote(Count, GroupName, Pid, - #state{groups = {DictI, OldGroupsData}, - monitors = OldMonitors, + #state{groups = {DictI, GroupsDataOld}, + monitors = MonitorsOld, callbacks = Callbacks} = State) -> Reason = leave_remote, - OldGroupData = DictI:fetch(GroupName, OldGroupsData), + GroupDataOld = DictI:fetch(GroupName, GroupsDataOld), #cpg_data{remote_count = RemoteI, - remote = OldRemote, - history = OldHistory} = OldGroupData, - {I, Remote} = leave_group_pid(Count, OldRemote, Pid), - History = leave_group_pid_count(I, OldHistory, Pid), + remote = RemoteOld, + history = HistoryOld} = GroupDataOld, + {I, Remote} = leave_group_pid(Count, RemoteOld, Pid), + History = leave_group_pid_count(I, HistoryOld, Pid), cpg_callbacks:notify_leave(Callbacks, GroupName, Pid, Reason, I), {Member, GroupsData} = if History == [] -> {false, - DictI:erase(GroupName, OldGroupsData)}; + DictI:erase(GroupName, GroupsDataOld)}; true -> {lists:member(Pid, Remote), DictI:store(GroupName, - OldGroupData#cpg_data{remote_count = RemoteI - I, + GroupDataOld#cpg_data{remote_count = RemoteI - I, remote = Remote, history = History}, - OldGroupsData)} + GroupsDataOld)} end, Monitors = if Member =:= true -> - OldMonitors; + MonitorsOld; Member =:= false -> - case maps:get(Pid, OldMonitors) of + case maps:get(Pid, MonitorsOld) of #state_monitor{monitor = MonitorProcess, names = [GroupName]} -> true = is_pid(MonitorProcess), ok = cpg_node_monitor:remove(MonitorProcess, Pid), - maps:remove(Pid, OldMonitors); - #state_monitor{names = OldGroupNameList} = OldStateMonitor -> - GroupNameList = lists:delete(GroupName, OldGroupNameList), + maps:remove(Pid, MonitorsOld); + #state_monitor{names = GroupNameListOld} = StateMonitorOld -> + GroupNameList = lists:delete(GroupName, GroupNameListOld), maps:put(Pid, - OldStateMonitor#state_monitor{ + StateMonitorOld#state_monitor{ names = GroupNameList}, - OldMonitors) + MonitorsOld) end end, State#state{groups = {DictI, GroupsData}, @@ -3289,24 +3315,24 @@ leave_group_pid_count(Count, Pids, Pid) -> leave_all_local([], _, _, State) -> State; leave_all_local([GroupName | GroupNameList], Pid, Reason, - #state{groups = {DictI, OldGroupsData}, + #state{groups = {DictI, GroupsDataOld}, callbacks = Callbacks} = State) -> - OldGroupData = DictI:fetch(GroupName, OldGroupsData), + GroupDataOld = DictI:fetch(GroupName, GroupsDataOld), #cpg_data{local_count = LocalI, - local = OldLocal, - history = OldHistory} = OldGroupData, - {I, Local} = leave_all_pid(OldLocal, Pid), + local = LocalOld, + history = HistoryOld} = GroupDataOld, + {I, Local} = leave_all_pid(LocalOld, Pid), cpg_callbacks:notify_leave(Callbacks, GroupName, Pid, Reason, I), - History = [P || P <- OldHistory, P /= Pid], + History = [P || P <- HistoryOld, P /= Pid], GroupsData = if History == [] -> - DictI:erase(GroupName, OldGroupsData); + DictI:erase(GroupName, GroupsDataOld); true -> DictI:store(GroupName, - OldGroupData#cpg_data{local_count = LocalI - I, + GroupDataOld#cpg_data{local_count = LocalI - I, local = Local, history = History}, - OldGroupsData) + GroupsDataOld) end, leave_all_local(GroupNameList, Pid, Reason, State#state{groups = {DictI, GroupsData}}). @@ -3314,24 +3340,24 @@ leave_all_local([GroupName | GroupNameList], Pid, Reason, leave_all_remote([], _, _, State) -> State; leave_all_remote([GroupName | GroupNameList], Pid, Reason, - #state{groups = {DictI, OldGroupsData}, + #state{groups = {DictI, GroupsDataOld}, callbacks = Callbacks} = State) -> - OldGroupData = DictI:fetch(GroupName, OldGroupsData), + GroupDataOld = DictI:fetch(GroupName, GroupsDataOld), #cpg_data{remote_count = RemoteI, - remote = OldRemote, - history = OldHistory} = OldGroupData, - {I, Remote} = leave_all_pid(OldRemote, Pid), + remote = RemoteOld, + history = HistoryOld} = GroupDataOld, + {I, Remote} = leave_all_pid(RemoteOld, Pid), cpg_callbacks:notify_leave(Callbacks, GroupName, Pid, Reason, I), - History = [P || P <- OldHistory, P /= Pid], + History = [P || P <- HistoryOld, P /= Pid], GroupsData = if History == [] -> - DictI:erase(GroupName, OldGroupsData); + DictI:erase(GroupName, GroupsDataOld); true -> DictI:store(GroupName, - OldGroupData#cpg_data{remote_count = RemoteI - I, + GroupDataOld#cpg_data{remote_count = RemoteI - I, remote = Remote, history = History}, - OldGroupsData) + GroupsDataOld) end, leave_all_remote(GroupNameList, Pid, Reason, State#state{groups = {DictI, GroupsData}}). @@ -3348,18 +3374,44 @@ leave_all_pid(Pids, Pid) -> merge_pid_conflict([], GroupData, _, GroupName, DictI, GroupsData, Monitors0, NodeMonitors0, - _, _) -> + _, _, _) -> {DictI:store(GroupName, GroupData, GroupsData), Monitors0, NodeMonitors0}; -merge_pid_conflict([Pid | Pids_X], - #cpg_data{local_count = LocalI, - local = Local, - history = History} = GroupData, - History_X, GroupName, - DictI, GroupsData, Monitors0, NodeMonitors0, - Callbacks, Node) - when node(Pid) =:= Node -> +merge_pid_conflict([Pid | Pids_X], GroupData, History_X, GroupName, + DictI, GroupsData, Monitors, NodeMonitors, + Callbacks, Node, NodesConnected) -> + PidNode = node(Pid), + if + PidNode =:= Node -> + merge_pid_conflict_local(Pid, Pids_X, + GroupData, History_X, GroupName, + DictI, GroupsData, Monitors, NodeMonitors, + Callbacks, Node, NodesConnected); + true -> + case sets:is_element(PidNode, NodesConnected) of + true -> + merge_pid_conflict_remote(Pid, PidNode, Pids_X, + GroupData, History_X, GroupName, + DictI, GroupsData, + Monitors, NodeMonitors, + Callbacks, Node, NodesConnected); + false -> + merge_pid_conflict(Pids_X, + GroupData, History_X, GroupName, + DictI, GroupsData, + Monitors, NodeMonitors, + Callbacks, Node, NodesConnected) + end + end. + +merge_pid_conflict_local(Pid, Pids_X, + #cpg_data{local_count = LocalI, + local = Local, + history = History} = GroupData, + History_X, GroupName, + DictI, GroupsData, Monitors, NodeMonitors, + Callbacks, Node, NodesConnected) -> % local pid counts must be equal I = count(Pid, History_X) - count(Pid, Local), if @@ -3367,43 +3419,44 @@ merge_pid_conflict([Pid | Pids_X], % add cpg_callbacks:notify_join(Callbacks, GroupName, Pid, join_local, I), - NewLocal = merge_pid_conflict_add(I, Local, Pid), - NewHistory = merge_pid_conflict_add(I, History, Pid), - MonitorsN = monitor_local(Pid, GroupName, Monitors0), + LocalNew = merge_pid_conflict_add(I, Local, Pid), + HistoryNew = merge_pid_conflict_add(I, History, Pid), + MonitorsNew = monitor_local(Pid, GroupName, Monitors), merge_pid_conflict(Pids_X, GroupData#cpg_data{ local_count = LocalI + I, - local = NewLocal, - history = NewHistory}, + local = LocalNew, + history = HistoryNew}, History_X, GroupName, - DictI, GroupsData, MonitorsN, NodeMonitors0, - Callbacks, Node); + DictI, GroupsData, MonitorsNew, NodeMonitors, + Callbacks, Node, NodesConnected); I < 0 -> % remove cpg_callbacks:notify_leave(Callbacks, GroupName, Pid, leave_local, I * -1), - NewLocal = merge_pid_conflict_remove(I, Local, Pid), - NewHistory = merge_pid_conflict_remove(I, History, Pid), + LocalNew = merge_pid_conflict_remove(I, Local, Pid), + HistoryNew = merge_pid_conflict_remove(I, History, Pid), merge_pid_conflict(Pids_X, GroupData#cpg_data{ local_count = LocalI + I, - local = NewLocal, - history = NewHistory}, + local = LocalNew, + history = HistoryNew}, History_X, GroupName, - DictI, GroupsData, Monitors0, NodeMonitors0, - Callbacks, Node); + DictI, GroupsData, Monitors, NodeMonitors, + Callbacks, Node, NodesConnected); true -> merge_pid_conflict(Pids_X, GroupData, History_X, GroupName, - DictI, GroupsData, Monitors0, NodeMonitors0, - Callbacks, Node) - end; -merge_pid_conflict([Pid | Pids_X], - #cpg_data{remote_count = RemoteI, - remote = Remote, - history = History} = GroupData, - History_X, GroupName, - DictI, GroupsData, Monitors0, NodeMonitors0, - Callbacks, Node) -> + DictI, GroupsData, Monitors, NodeMonitors, + Callbacks, Node, NodesConnected) + end. + +merge_pid_conflict_remote(Pid, PidNode, Pids_X, + #cpg_data{remote_count = RemoteI, + remote = Remote, + history = History} = GroupData, + History_X, GroupName, + DictI, GroupsData, Monitors, NodeMonitors, + Callbacks, Node, NodesConnected) -> % remote pid counts must be equal I = count(Pid, History_X) - count(Pid, Remote), if @@ -3411,37 +3464,37 @@ merge_pid_conflict([Pid | Pids_X], % add cpg_callbacks:notify_join(Callbacks, GroupName, Pid, join_remote, I), - NewRemote = merge_pid_conflict_add(I, Remote, Pid), - NewHistory = merge_pid_conflict_add(I, History, Pid), - {MonitorsN, - NodeMonitorsN} = monitor_remote(Pid, GroupName, - Monitors0, NodeMonitors0), + RemoteNew = merge_pid_conflict_add(I, Remote, Pid), + HistoryNew = merge_pid_conflict_add(I, History, Pid), + {MonitorsNew, + NodeMonitorsNew} = monitor_remote(Pid, PidNode, GroupName, + Monitors, NodeMonitors), merge_pid_conflict(Pids_X, GroupData#cpg_data{ remote_count = RemoteI + I, - remote = NewRemote, - history = NewHistory}, + remote = RemoteNew, + history = HistoryNew}, History_X, GroupName, - DictI, GroupsData, MonitorsN, NodeMonitorsN, - Callbacks, Node); + DictI, GroupsData, MonitorsNew, NodeMonitorsNew, + Callbacks, Node, NodesConnected); I < 0 -> % remove cpg_callbacks:notify_leave(Callbacks, GroupName, Pid, leave_remote, I * -1), - NewRemote = merge_pid_conflict_remove(I, Remote, Pid), - NewHistory = merge_pid_conflict_remove(I, History, Pid), + RemoteNew = merge_pid_conflict_remove(I, Remote, Pid), + HistoryNew = merge_pid_conflict_remove(I, History, Pid), merge_pid_conflict(Pids_X, GroupData#cpg_data{ remote_count = RemoteI + I, - remote = NewRemote, - history = NewHistory}, + remote = RemoteNew, + history = HistoryNew}, History_X, GroupName, - DictI, GroupsData, Monitors0, NodeMonitors0, - Callbacks, Node); + DictI, GroupsData, Monitors, NodeMonitors, + Callbacks, Node, NodesConnected); true -> merge_pid_conflict(Pids_X, GroupData, History_X, GroupName, - DictI, GroupsData, Monitors0, NodeMonitors0, - Callbacks, Node) + DictI, GroupsData, Monitors, NodeMonitors, + Callbacks, Node, NodesConnected) end. merge_pid_conflict_add(0, Pids, _) -> @@ -3456,92 +3509,135 @@ merge_pid_conflict_remove(I, [Pid | Pids], Pid) -> merge_pid_conflict_remove(I, [P | Pids], Pid) -> [P | merge_pid_conflict_remove(I, Pids, Pid)]. -merge_pid_new([], - #cpg_data{local = Local, - remote = Remote} = GroupData, GroupName, - DictI, GroupsData, Monitors0, NodeMonitors0, _, _) -> - Monitors2 = lists:foldl(fun(PidLocal, Monitors1) -> - monitor_local(PidLocal, GroupName, Monitors1) - end, Monitors0, lists:usort(Local)), - {MonitorsN, - NodeMonitorsN} = lists:foldl(fun(PidRemote, {Monitors3, NodeMonitors1}) -> - monitor_remote(PidRemote, GroupName, Monitors3, NodeMonitors1) - end, {Monitors2, NodeMonitors0}, lists:usort(Remote)), - {DictI:store(GroupName, GroupData, GroupsData), - MonitorsN, - NodeMonitorsN}; -merge_pid_new([Pid | Pids_X], - #cpg_data{% history already set from remote data - local_count = LocalI, - local = Local, - remote_count = RemoteI, - remote = Remote} = GroupData, GroupName, - DictI, GroupsData, Monitors, NodeMonitors, Callbacks, Node) -> - Reason = if - node(Pid) =:= Node -> - % could get here if cpg was restarted on the local node - join_local; +merge_pid_new([], _, + #cpg_data{history = History} = GroupData, GroupName, + DictI, GroupsData, Monitors, NodeMonitors, _, _, _) -> + GroupsDataNew = if + History == [] -> + GroupsData; true -> - join_remote + DictI:store(GroupName, GroupData, GroupsData) end, - cpg_callbacks:notify_join(Callbacks, GroupName, Pid, Reason), + {GroupsDataNew, Monitors, NodeMonitors}; +merge_pid_new([Pid | Pids_X], PidsMonitored, + #cpg_data{history = History} = GroupData, GroupName, + DictI, GroupsData, Monitors, NodeMonitors, + Callbacks, Node, NodesConnected) -> + PidNode = node(Pid), if - Reason =:= join_local -> - merge_pid_new(Pids_X, - GroupData#cpg_data{ - local_count = LocalI + 1, - local = [Pid | Local]}, GroupName, - DictI, GroupsData, Monitors, NodeMonitors, - Callbacks, Node); - Reason =:= join_remote -> - merge_pid_new(Pids_X, - GroupData#cpg_data{ - remote_count = RemoteI + 1, - remote = [Pid | Remote]}, GroupName, - DictI, GroupsData, Monitors, NodeMonitors, - Callbacks, Node) + PidNode =:= Node -> + merge_pid_new_local(Pid, Pids_X, PidsMonitored, + GroupData, GroupName, + DictI, GroupsData, Monitors, NodeMonitors, + Callbacks, Node, NodesConnected); + true -> + case sets:is_element(PidNode, NodesConnected) of + true -> + merge_pid_new_remote(Pid, PidNode, Pids_X, PidsMonitored, + GroupData, GroupName, + DictI, GroupsData, + Monitors, NodeMonitors, + Callbacks, Node, NodesConnected); + false -> + HistoryNew = delete_all(Pid, History), + merge_pid_new(delete_all(Pid, Pids_X), PidsMonitored, + GroupData#cpg_data{history = HistoryNew}, + GroupName, + DictI, GroupsData, + Monitors, NodeMonitors, + Callbacks, Node, NodesConnected) + end end. -merge_pids([], _, GroupsData, Monitors, NodeMonitors, _, _) -> +merge_pid_new_local(Pid, Pids_X, PidsMonitored, + #cpg_data{% history already set from remote data + local_count = LocalI, + local = Local} = GroupData, GroupName, + DictI, GroupsData, Monitors, NodeMonitors, + Callbacks, Node, NodesConnected) -> + % (could get here if cpg was restarted on the local node) + cpg_callbacks:notify_join(Callbacks, GroupName, Pid, join_local), + {PidsMonitoredNew, + MonitorsNew} = case sets:is_element(Pid, PidsMonitored) of + true -> + {PidsMonitored, Monitors}; + false -> + {sets:add_element(Pid, PidsMonitored), + monitor_local(Pid, GroupName, Monitors)} + end, + merge_pid_new(Pids_X, PidsMonitoredNew, + GroupData#cpg_data{ + local_count = LocalI + 1, + local = [Pid | Local]}, GroupName, + DictI, GroupsData, MonitorsNew, NodeMonitors, + Callbacks, Node, NodesConnected). + +merge_pid_new_remote(Pid, PidNode, Pids_X, PidsMonitored, + #cpg_data{% history already set from remote data + remote_count = RemoteI, + remote = Remote} = GroupData, GroupName, + DictI, GroupsData, Monitors, NodeMonitors, + Callbacks, Node, NodesConnected) -> + cpg_callbacks:notify_join(Callbacks, GroupName, Pid, join_remote), + {PidsMonitoredNew, + {MonitorsNew, + NodeMonitorsNew}} = case sets:is_element(Pid, PidsMonitored) of + true -> + {PidsMonitored, {Monitors, NodeMonitors}}; + false -> + {sets:add_element(Pid, PidsMonitored), + monitor_remote(Pid, PidNode, GroupName, + Monitors, NodeMonitors)} + end, + merge_pid_new(Pids_X, PidsMonitoredNew, + GroupData#cpg_data{ + remote_count = RemoteI + 1, + remote = [Pid | Remote]}, GroupName, + DictI, GroupsData, MonitorsNew, NodeMonitorsNew, + Callbacks, Node, NodesConnected). + +merge_pids([], _, GroupsData, Monitors, NodeMonitors, _, _, _) -> {GroupsData, Monitors, NodeMonitors}; merge_pids([{GroupName, History_X} | HistoryL_X], DictI, GroupsData, Monitors, NodeMonitors, - Callbacks, Node) -> - {NewGroupsData, - NewMonitors, - NewNodeMonitors} = case DictI:find(GroupName, GroupsData) of + Callbacks, Node, NodesConnected) -> + {GroupsDataNew, + MonitorsNew, + NodeMonitorsNew} = case DictI:find(GroupName, GroupsData) of {ok, GroupData} -> % merge the external group in merge_pid_conflict(lists:usort(History_X), GroupData, History_X, GroupName, DictI, GroupsData, Monitors, NodeMonitors, - Callbacks, Node); + Callbacks, Node, NodesConnected); error -> % create the new external group as an internal group - merge_pid_new(lists:reverse(History_X), + merge_pid_new(lists:reverse(History_X), sets:new(), #cpg_data{history = History_X}, GroupName, DictI, GroupsData, Monitors, NodeMonitors, - Callbacks, Node) + Callbacks, Node, NodesConnected) end, merge_pids(HistoryL_X, - DictI, NewGroupsData, NewMonitors, NewNodeMonitors, - Callbacks, Node). + DictI, GroupsDataNew, MonitorsNew, NodeMonitorsNew, + Callbacks, Node, NodesConnected). merge(HistoryL_X, #state{groups = {DictI, GroupsData}, monitors = Monitors, node_monitors = NodeMonitors, - callbacks = Callbacks} = State) -> - {NewGroupsData, - NewMonitors, - NewNodeMonitors} = merge_pids(HistoryL_X, + callbacks = Callbacks, + listen = Listen} = State) -> + {GroupsDataNew, + MonitorsNew, + NodeMonitorsNew} = merge_pids(HistoryL_X, DictI, GroupsData, Monitors, NodeMonitors, - Callbacks, node()), - State#state{groups = {DictI, NewGroupsData}, - monitors = NewMonitors, - node_monitors = NewNodeMonitors}. + Callbacks, node(), + sets:from_list(listen_nodes(Listen))), + State#state{groups = {DictI, GroupsDataNew}, + monitors = MonitorsNew, + node_monitors = NodeMonitorsNew}. merge_start(Node, #state{scope = Scope, @@ -3559,9 +3655,9 @@ merge_start(Node, member_died_local(Pid, Reason, #state{monitors = Monitors} = State) -> case maps:take(Pid, Monitors) of - {#state_monitor{names = GroupNameList}, NewMonitors} -> + {#state_monitor{names = GroupNameList}, MonitorsNew} -> leave_all_local(GroupNameList, Pid, Reason, - State#state{monitors = NewMonitors}); + State#state{monitors = MonitorsNew}); error -> State end. @@ -3571,69 +3667,73 @@ members_died_remote([], State) -> members_died_remote([{Pid, Reason} | PidReasons], #state{monitors = Monitors} = State) -> case maps:take(Pid, Monitors) of - {#state_monitor{names = GroupNameList}, NewMonitors} -> - NewState = State#state{monitors = NewMonitors}, + {#state_monitor{names = GroupNameList}, MonitorsNew} -> + StateNew = State#state{monitors = MonitorsNew}, members_died_remote(PidReasons, leave_all_remote(GroupNameList, - Pid, Reason, NewState)); + Pid, Reason, StateNew)); error -> members_died_remote(PidReasons, State) end. -node_died([], _, State) -> - State; -node_died([Pid | Pids], Reason, +node_died([], _, PidReasonsL, State) -> + node_died_related(PidReasonsL, State); +node_died([Pid | Pids], Reason, PidReasonsL, #state{monitors = Monitors} = State) -> case maps:take(Pid, Monitors) of - {#state_monitor{names = GroupNameList}, NewMonitors} -> - NewState = State#state{monitors = NewMonitors}, - node_died(Pids, Reason, + {#state_monitor{names = GroupNameList}, MonitorsNew} -> + StateNew = State#state{monitors = MonitorsNew}, + node_died(Pids, Reason, PidReasonsL, leave_all_remote(GroupNameList, - Pid, Reason, NewState)); + Pid, Reason, StateNew)); error -> - node_died(Pids, Reason, State) + node_died(Pids, Reason, PidReasonsL, State) end. -monitor_local(Pid, GroupName, OldMonitors) -> - case maps:find(Pid, OldMonitors) of - {ok, #state_monitor{names = OldGroupNameList} = OldStateMonitor} -> - GroupNameList = lists:umerge(OldGroupNameList, [GroupName]), +node_died_related([], State) -> + State; +node_died_related([PidReasons | PidReasonsL], State) -> + node_died_related(PidReasonsL, members_died_remote(PidReasons, State)). + +monitor_local(Pid, GroupName, MonitorsOld) -> + case maps:find(Pid, MonitorsOld) of + {ok, #state_monitor{names = GroupNameListOld} = StateMonitorOld} -> + GroupNameList = lists:umerge(GroupNameListOld, [GroupName]), maps:put(Pid, - OldStateMonitor#state_monitor{names = GroupNameList}, - OldMonitors); + StateMonitorOld#state_monitor{names = GroupNameList}, + MonitorsOld); error -> MonitorRef = erlang:monitor(process, Pid), maps:put(Pid, #state_monitor{monitor = MonitorRef, names = [GroupName]}, - OldMonitors) + MonitorsOld) end. -monitor_remote(Pid, GroupName, OldMonitors, OldNodeMonitors) -> - case maps:find(Pid, OldMonitors) of - {ok, #state_monitor{names = OldGroupNameList} = OldStateMonitor} -> - GroupNameList = lists:umerge(OldGroupNameList, [GroupName]), +monitor_remote(Pid, PidNode, GroupName, MonitorsOld, NodeMonitorsOld) -> + case maps:find(Pid, MonitorsOld) of + {ok, #state_monitor{names = GroupNameListOld} = StateMonitorOld} -> + GroupNameList = lists:umerge(GroupNameListOld, [GroupName]), {maps:put(Pid, - OldStateMonitor#state_monitor{names = GroupNameList}, - OldMonitors), - OldNodeMonitors}; + StateMonitorOld#state_monitor{names = GroupNameList}, + MonitorsOld), + NodeMonitorsOld}; error -> - Node = node(Pid), {MonitorProcess, - NextNodeMonitors} = case maps:find(Node, OldNodeMonitors) of - {ok, OldMonitorProcess} -> - ok = cpg_node_monitor:add(OldMonitorProcess, Pid), - {OldMonitorProcess, OldNodeMonitors}; + NodeMonitorsNext} = case maps:find(PidNode, NodeMonitorsOld) of + {ok, MonitorProcessOld} -> + ok = cpg_node_monitor:add(MonitorProcessOld, Pid), + {MonitorProcessOld, NodeMonitorsOld}; error -> - {ok, NewMonitorProcess} = cpg_node_monitor:start_link(Pid), - {NewMonitorProcess, - maps:put(Node, NewMonitorProcess, OldNodeMonitors)} + {ok, MonitorProcessNew} = cpg_node_monitor:start_link(Pid), + {MonitorProcessNew, + maps:put(PidNode, MonitorProcessNew, NodeMonitorsOld)} end, {maps:put(Pid, #state_monitor{monitor = MonitorProcess, names = [GroupName]}, - OldMonitors), - NextNodeMonitors} + MonitorsOld), + NodeMonitorsNext} end. whereis_name_random(1, [Pid]) -> @@ -3650,6 +3750,18 @@ count([Elem | T], I, Elem) -> count([_ | T], I, Elem) -> count(T, I, Elem). +prepend(_, 0, List) -> + List; +prepend(Elem, Count, List) -> + prepend(Elem, Count - 1, [Elem | List]). + +delete_all(_, []) -> + []; +delete_all(Elem, [Elem | T]) -> + delete_all(Elem, T); +delete_all(Elem, [H | T]) -> + [H | delete_all(Elem, T)]. + -compile({inline, [{random,1}]}). random(N) -> diff --git a/src/cpg_constants.hrl b/src/cpg_constants.hrl index 8f64e64..90b278c 100644 --- a/src/cpg_constants.hrl +++ b/src/cpg_constants.hrl @@ -8,5 +8,8 @@ % how long to wait for remote pid monitor deaths before sending a list of them % (within cpg_node_monitor) --define(MONITORS_ACCUMULATE_DELAY, 0). % milliseconds +-define(MONITORS_SEND_DELAY, 0). % milliseconds + +% how long to wait for flushing accumulated monitor data from cpg_node_monitor +-define(MONITORS_FLUSH_DELAY, 0). % milliseconds diff --git a/src/cpg_data.erl b/src/cpg_data.erl index c3f8461..aa44d57 100644 --- a/src/cpg_data.erl +++ b/src/cpg_data.erl @@ -12,7 +12,7 @@ %%% %%% MIT License %%% -%%% Copyright (c) 2011-2018 Michael Truog <mjtruog at protonmail dot com> +%%% Copyright (c) 2011-2020 Michael Truog <mjtruog at protonmail dot com> %%% %%% Permission is hereby granted, free of charge, to any person obtaining a %%% copy of this software and associated documentation files (the "Software"), @@ -33,8 +33,8 @@ %%% DEALINGS IN THE SOFTWARE. %%% %%% @author Michael Truog <mjtruog at protonmail dot com> -%%% @copyright 2011-2018 Michael Truog -%%% @version 1.7.5 {@date} {@time} +%%% @copyright 2011-2020 Michael Truog +%%% @version 1.8.1 {@date} {@time} %%%------------------------------------------------------------------------ -module(cpg_data). @@ -493,10 +493,9 @@ get_random_pid(GroupName, Groups) -> {ok, _, #cpg_data{history = []}} -> {error, {'no_process', GroupName}}; {ok, Pattern, #cpg_data{local_count = LocalCount, - local = Local, remote_count = RemoteCount, - remote = Remote}} -> - pick(LocalCount + RemoteCount, Local ++ Remote, Pattern) + history = History}} -> + pick(LocalCount + RemoteCount, History, Pattern) end. %%------------------------------------------------------------------------- @@ -520,10 +519,9 @@ get_random_pid(GroupName, Exclude, Groups) {ok, _, #cpg_data{history = []}} -> {error, {'no_process', GroupName}}; {ok, Pattern, #cpg_data{local_count = LocalCount, - local = Local, remote_count = RemoteCount, - remote = Remote}} -> - pick(LocalCount + RemoteCount, Local ++ Remote, + history = History}} -> + pick(LocalCount + RemoteCount, History, Exclude, GroupName, Pattern) end. diff --git a/src/cpg_node_monitor.erl b/src/cpg_node_monitor.erl index 67e8429..6570273 100644 --- a/src/cpg_node_monitor.erl +++ b/src/cpg_node_monitor.erl @@ -10,7 +10,7 @@ %%% %%% MIT License %%% -%%% Copyright (c) 2017-2018 Michael Truog <mjtruog at protonmail dot com> +%%% Copyright (c) 2017-2020 Michael Truog <mjtruog at protonmail dot com> %%% %%% Permission is hereby granted, free of charge, to any person obtaining a %%% copy of this software and associated documentation files (the "Software"), @@ -31,8 +31,8 @@ %%% DEALINGS IN THE SOFTWARE. %%% %%% @author Michael Truog <mjtruog at protonmail dot com> -%%% @copyright 2017-2018 Michael Truog -%%% @version 1.7.4 {@date} {@time} +%%% @copyright 2017-2020 Michael Truog +%%% @version 1.8.1 {@date} {@time} %%%------------------------------------------------------------------------ -module(cpg_node_monitor). @@ -80,10 +80,11 @@ stop_link(Process) -> ok. -spec died(Process :: process()) -> - list(pid()). + {list(pid()), list(nonempty_list({pid(), {exit, any()}}))}. died(Process) -> - monitors_flush(gen_server:call(Process, died)). + Pids = gen_server:call(Process, died), + {Pids, monitors_flush([])}. -spec add(Process :: process(), Pid :: pid()) -> @@ -160,28 +161,26 @@ code_change(_, State, _) -> %%% Private functions %%%------------------------------------------------------------------------ -monitors_send({'DOWN', _MonitorRef, process, Pid, Info}, DOWNS, +monitors_send({'DOWN', _MonitorRef, process, Pid, Info}, PidReasons, #state{parent = Parent, monitors = Monitors} = State) -> - NewDOWNS = [{Pid, {exit, Info}} | DOWNS], - NewState = State#state{monitors = maps:remove(Pid, Monitors)}, + PidReasonsNew = [{Pid, {exit, Info}} | PidReasons], + StateNew = State#state{monitors = maps:remove(Pid, Monitors)}, receive {'DOWN', _, process, _, _} = DOWN -> - monitors_send(DOWN, NewDOWNS, NewState) + monitors_send(DOWN, PidReasonsNew, StateNew) after - ?MONITORS_ACCUMULATE_DELAY -> - Parent ! {'DOWNS', NewDOWNS}, - NewState + ?MONITORS_SEND_DELAY -> + Parent ! {'DOWNS', PidReasonsNew}, + StateNew end. -monitors_flush(Pids) -> +monitors_flush(PidReasonsL) -> receive {'DOWNS', PidReasons} -> - monitors_flush(lists:foldl(fun({Pid, _}, NewPids) -> - [Pid | NewPids] - end, Pids, PidReasons)) + monitors_flush([PidReasons | PidReasonsL]) after - 0 -> - Pids + ?MONITORS_FLUSH_DELAY -> + lists:reverse(PidReasonsL) end. diff --git a/src/supervisor_cpg_spawn.erl b/src/supervisor_cpg_spawn.erl index 66d621c..28fc9be 100644 --- a/src/supervisor_cpg_spawn.erl +++ b/src/supervisor_cpg_spawn.erl @@ -9,7 +9,7 @@ %%% %%% MIT License %%% -%%% Copyright (c) 2013-2018 Michael Truog <mjtruog at protonmail dot com> +%%% Copyright (c) 2013-2020 Michael Truog <mjtruog at protonmail dot com> %%% %%% Permission is hereby granted, free of charge, to any person obtaining a %%% copy of this software and associated documentation files (the "Software"), @@ -30,8 +30,8 @@ %%% DEALINGS IN THE SOFTWARE. %%% %%% @author Michael Truog <mjtruog at protonmail dot com> -%%% @copyright 2013-2018 Michael Truog -%%% @version 1.7.4 {@date} {@time} +%%% @copyright 2013-2020 Michael Truog +%%% @version 1.8.1 {@date} {@time} %%%------------------------------------------------------------------------ -module(supervisor_cpg_spawn). @@ -205,11 +205,11 @@ handle_cast({restart_nomad_child, NomadChildSpec = {Id, StartFunc, temporary, Shutdown, Type, Modules}, case supervisor:start_child(ChildSup, NomadChildSpec) of {ok, NomadChild} -> - NewNomadState = NomadState#state_nomad{pid = NomadChild}, + NomadStateNew = NomadState#state_nomad{pid = NomadChild}, MonitorRef = erlang:monitor(process, NomadChild), {noreply, State#state{nomads = dict:store(MonitorRef, - NewNomadState, + NomadStateNew, Nomads)}}; {error, Reason} -> report_error(start_error, Reason, @@ -333,19 +333,19 @@ nomad_restart(_Reason, max_r = MaxR, max_t = MaxT} = NomadState) -> Now = timestamp(), - NewRestarts = lists:dropwhile(fun(T) -> + RestartsNew = lists:dropwhile(fun(T) -> timer:now_diff(Now, T) / 1000000 > MaxT end, Restarts) ++ [Now], - NextSupPid = cpg_random_pid(Name, self()), + SupPidNext = cpg_random_pid(Name, self()), if - erlang:length(NewRestarts) > MaxR -> + erlang:length(RestartsNew) > MaxR -> report_error(shutdown, reached_max_restart_intensity, NomadState); - NextSupPid =:= undefined -> + SupPidNext =:= undefined -> report_error(restart_error, noproc, NomadState); true -> - gen_server:cast(NextSupPid, + gen_server:cast(SupPidNext, {restart_nomad_child, - NomadState#state_nomad{restarts = NewRestarts}}) + NomadState#state_nomad{restarts = RestartsNew}}) end. % based on OTP supervisor code
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor