corosync  2.3.5
exec/cpg.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2006-2015 Red Hat, Inc.
3  *
4  * All rights reserved.
5  *
6  * Author: Christine Caulfield (ccaulfie@redhat.com)
7  * Author: Jan Friesse (jfriesse@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 #include <config.h>
37 
38 #ifdef HAVE_ALLOCA_H
39 #include <alloca.h>
40 #endif
41 #include <sys/types.h>
42 #include <sys/socket.h>
43 #include <sys/un.h>
44 #include <sys/ioctl.h>
45 #include <netinet/in.h>
46 #include <sys/uio.h>
47 #include <unistd.h>
48 #include <fcntl.h>
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <errno.h>
52 #include <time.h>
53 #include <assert.h>
54 #include <unistd.h>
55 #include <netinet/in.h>
56 #include <arpa/inet.h>
57 #include <sys/mman.h>
58 #include <qb/qbmap.h>
59 
60 #include <corosync/corotypes.h>
61 #include <qb/qbipc_common.h>
62 #include <corosync/corodefs.h>
63 #include <corosync/list.h>
64 #include <corosync/logsys.h>
65 #include <corosync/coroapi.h>
66 
67 #include <corosync/cpg.h>
68 #include <corosync/ipc_cpg.h>
69 
70 #ifndef MAP_ANONYMOUS
71 #define MAP_ANONYMOUS MAP_ANON
72 #endif
73 
74 #include "service.h"
75 
76 LOGSYS_DECLARE_SUBSYS ("CPG");
77 
78 #define GROUP_HASH_SIZE 32
79 
88 };
89 
90 struct zcb_mapped {
91  struct list_head list;
92  void *addr;
93  size_t size;
94 };
95 /*
96  * state` exec deliver
97  * match group name, pid -> if matched deliver for YES:
98  * XXX indicates impossible state
99  *
100  * join leave mcast
101  * UNJOINED XXX XXX NO
102  * LEAVE_STARTED XXX YES(unjoined_enter) YES
103  * JOIN_STARTED YES(join_started_enter) XXX NO
104  * JOIN_COMPLETED XXX NO YES
105  *
106  * join_started_enter
107  * set JOIN_COMPLETED
108  * add entry to process_info list
109  * unjoined_enter
110  * set UNJOINED
111  * delete entry from process_info list
112  *
113  *
114  * library accept join error codes
115  * UNJOINED YES(CS_OK) set JOIN_STARTED
116  * LEAVE_STARTED NO(CS_ERR_BUSY)
117  * JOIN_STARTED NO(CS_ERR_EXIST)
118  * JOIN_COMPlETED NO(CS_ERR_EXIST)
119  *
120  * library accept leave error codes
121  * UNJOINED NO(CS_ERR_NOT_EXIST)
122  * LEAVE_STARTED NO(CS_ERR_NOT_EXIST)
123  * JOIN_STARTED NO(CS_ERR_BUSY)
124  * JOIN_COMPLETED YES(CS_OK) set LEAVE_STARTED
125  *
126  * library accept mcast
127  * UNJOINED NO(CS_ERR_NOT_EXIST)
128  * LEAVE_STARTED NO(CS_ERR_NOT_EXIST)
129  * JOIN_STARTED YES(CS_OK)
130  * JOIN_COMPLETED YES(CS_OK)
131  */
132 enum cpd_state {
137 };
138 
142 };
143 
148 };
149 static enum cpg_downlist_state_e downlist_state;
150 static struct list_head downlist_messages_head;
151 static struct list_head joinlist_messages_head;
152 
153 struct cpg_pd {
154  void *conn;
156  uint32_t pid;
158  unsigned int flags;
160  uint64_t transition_counter; /* These two are used when sending fragmented messages */
162  struct list_head list;
163  struct list_head iteration_instance_list_head;
164  struct list_head zcb_mapped_list_head;
165 };
166 
169  struct list_head list;
170  struct list_head items_list_head; /* List of process_info */
172 };
173 
174 DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db,NULL);
175 
176 DECLARE_LIST_INIT(cpg_pd_list_head);
177 
178 static unsigned int my_member_list[PROCESSOR_COUNT_MAX];
179 
180 static unsigned int my_member_list_entries;
181 
182 static unsigned int my_old_member_list[PROCESSOR_COUNT_MAX];
183 
184 static unsigned int my_old_member_list_entries = 0;
185 
186 static struct corosync_api_v1 *api = NULL;
187 
188 static enum cpg_sync_state my_sync_state = CPGSYNC_DOWNLIST;
189 
190 static mar_cpg_ring_id_t last_sync_ring_id;
191 
192 struct process_info {
193  unsigned int nodeid;
194  uint32_t pid;
196  struct list_head list; /* on the group_info members list */
197 };
198 DECLARE_LIST_INIT(process_info_list_head);
199 
201  uint32_t pid;
203 };
204 
205 /*
206  * Service Interfaces required by service_message_handler struct
207  */
208 static char *cpg_exec_init_fn (struct corosync_api_v1 *);
209 
210 static int cpg_lib_init_fn (void *conn);
211 
212 static int cpg_lib_exit_fn (void *conn);
213 
214 static void message_handler_req_exec_cpg_procjoin (
215  const void *message,
216  unsigned int nodeid);
217 
218 static void message_handler_req_exec_cpg_procleave (
219  const void *message,
220  unsigned int nodeid);
221 
222 static void message_handler_req_exec_cpg_joinlist (
223  const void *message,
224  unsigned int nodeid);
225 
226 static void message_handler_req_exec_cpg_mcast (
227  const void *message,
228  unsigned int nodeid);
229 
230 static void message_handler_req_exec_cpg_partial_mcast (
231  const void *message,
232  unsigned int nodeid);
233 
234 static void message_handler_req_exec_cpg_downlist_old (
235  const void *message,
236  unsigned int nodeid);
237 
238 static void message_handler_req_exec_cpg_downlist (
239  const void *message,
240  unsigned int nodeid);
241 
242 static void exec_cpg_procjoin_endian_convert (void *msg);
243 
244 static void exec_cpg_joinlist_endian_convert (void *msg);
245 
246 static void exec_cpg_mcast_endian_convert (void *msg);
247 
248 static void exec_cpg_partial_mcast_endian_convert (void *msg);
249 
250 static void exec_cpg_downlist_endian_convert_old (void *msg);
251 
252 static void exec_cpg_downlist_endian_convert (void *msg);
253 
254 static void message_handler_req_lib_cpg_join (void *conn, const void *message);
255 
256 static void message_handler_req_lib_cpg_leave (void *conn, const void *message);
257 
258 static void message_handler_req_lib_cpg_finalize (void *conn, const void *message);
259 
260 static void message_handler_req_lib_cpg_mcast (void *conn, const void *message);
261 
262 static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message);
263 
264 static void message_handler_req_lib_cpg_membership (void *conn,
265  const void *message);
266 
267 static void message_handler_req_lib_cpg_local_get (void *conn,
268  const void *message);
269 
270 static void message_handler_req_lib_cpg_iteration_initialize (
271  void *conn,
272  const void *message);
273 
274 static void message_handler_req_lib_cpg_iteration_next (
275  void *conn,
276  const void *message);
277 
278 static void message_handler_req_lib_cpg_iteration_finalize (
279  void *conn,
280  const void *message);
281 
282 static void message_handler_req_lib_cpg_zc_alloc (
283  void *conn,
284  const void *message);
285 
286 static void message_handler_req_lib_cpg_zc_free (
287  void *conn,
288  const void *message);
289 
290 static void message_handler_req_lib_cpg_zc_execute (
291  void *conn,
292  const void *message);
293 
294 static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason);
295 
296 static int cpg_exec_send_downlist(void);
297 
298 static int cpg_exec_send_joinlist(void);
299 
300 static void downlist_messages_delete (void);
301 
302 static void downlist_master_choose_and_send (void);
303 
304 static void joinlist_inform_clients (void);
305 
306 static void joinlist_messages_delete (void);
307 
308 static void cpg_sync_init (
309  const unsigned int *trans_list,
310  size_t trans_list_entries,
311  const unsigned int *member_list,
312  size_t member_list_entries,
313  const struct memb_ring_id *ring_id);
314 
315 static int cpg_sync_process (void);
316 
317 static void cpg_sync_activate (void);
318 
319 static void cpg_sync_abort (void);
320 
321 static void do_proc_join(
322  const mar_cpg_name_t *name,
323  uint32_t pid,
324  unsigned int nodeid,
325  int reason);
326 
327 static void do_proc_leave(
328  const mar_cpg_name_t *name,
329  uint32_t pid,
330  unsigned int nodeid,
331  int reason);
332 
333 static int notify_lib_totem_membership (
334  void *conn,
335  int member_list_entries,
336  const unsigned int *member_list);
337 
338 static inline int zcb_all_free (
339  struct cpg_pd *cpd);
340 
341 static char *cpg_print_group_name (
342  const mar_cpg_name_t *group);
343 
344 /*
345  * Library Handler Definition
346  */
347 static struct corosync_lib_handler cpg_lib_engine[] =
348 {
349  { /* 0 - MESSAGE_REQ_CPG_JOIN */
350  .lib_handler_fn = message_handler_req_lib_cpg_join,
351  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
352  },
353  { /* 1 - MESSAGE_REQ_CPG_LEAVE */
354  .lib_handler_fn = message_handler_req_lib_cpg_leave,
355  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
356  },
357  { /* 2 - MESSAGE_REQ_CPG_MCAST */
358  .lib_handler_fn = message_handler_req_lib_cpg_mcast,
359  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
360  },
361  { /* 3 - MESSAGE_REQ_CPG_MEMBERSHIP */
362  .lib_handler_fn = message_handler_req_lib_cpg_membership,
363  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
364  },
365  { /* 4 - MESSAGE_REQ_CPG_LOCAL_GET */
366  .lib_handler_fn = message_handler_req_lib_cpg_local_get,
367  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
368  },
369  { /* 5 - MESSAGE_REQ_CPG_ITERATIONINITIALIZE */
370  .lib_handler_fn = message_handler_req_lib_cpg_iteration_initialize,
371  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
372  },
373  { /* 6 - MESSAGE_REQ_CPG_ITERATIONNEXT */
374  .lib_handler_fn = message_handler_req_lib_cpg_iteration_next,
375  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
376  },
377  { /* 7 - MESSAGE_REQ_CPG_ITERATIONFINALIZE */
378  .lib_handler_fn = message_handler_req_lib_cpg_iteration_finalize,
379  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
380  },
381  { /* 8 - MESSAGE_REQ_CPG_FINALIZE */
382  .lib_handler_fn = message_handler_req_lib_cpg_finalize,
383  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
384  },
385  { /* 9 */
386  .lib_handler_fn = message_handler_req_lib_cpg_zc_alloc,
387  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
388  },
389  { /* 10 */
390  .lib_handler_fn = message_handler_req_lib_cpg_zc_free,
391  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
392  },
393  { /* 11 */
394  .lib_handler_fn = message_handler_req_lib_cpg_zc_execute,
395  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
396  },
397  { /* 12 */
398  .lib_handler_fn = message_handler_req_lib_cpg_partial_mcast,
399  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
400  },
401 
402 };
403 
404 static struct corosync_exec_handler cpg_exec_engine[] =
405 {
406  { /* 0 - MESSAGE_REQ_EXEC_CPG_PROCJOIN */
407  .exec_handler_fn = message_handler_req_exec_cpg_procjoin,
408  .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
409  },
410  { /* 1 - MESSAGE_REQ_EXEC_CPG_PROCLEAVE */
411  .exec_handler_fn = message_handler_req_exec_cpg_procleave,
412  .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
413  },
414  { /* 2 - MESSAGE_REQ_EXEC_CPG_JOINLIST */
415  .exec_handler_fn = message_handler_req_exec_cpg_joinlist,
416  .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert
417  },
418  { /* 3 - MESSAGE_REQ_EXEC_CPG_MCAST */
419  .exec_handler_fn = message_handler_req_exec_cpg_mcast,
420  .exec_endian_convert_fn = exec_cpg_mcast_endian_convert
421  },
422  { /* 4 - MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD */
423  .exec_handler_fn = message_handler_req_exec_cpg_downlist_old,
424  .exec_endian_convert_fn = exec_cpg_downlist_endian_convert_old
425  },
426  { /* 5 - MESSAGE_REQ_EXEC_CPG_DOWNLIST */
427  .exec_handler_fn = message_handler_req_exec_cpg_downlist,
428  .exec_endian_convert_fn = exec_cpg_downlist_endian_convert
429  },
430  { /* 6 - MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST */
431  .exec_handler_fn = message_handler_req_exec_cpg_partial_mcast,
432  .exec_endian_convert_fn = exec_cpg_partial_mcast_endian_convert
433  },
434 };
435 
437  .name = "corosync cluster closed process group service v1.01",
438  .id = CPG_SERVICE,
439  .priority = 1,
440  .private_data_size = sizeof (struct cpg_pd),
441  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED,
442  .allow_inquorate = CS_LIB_ALLOW_INQUORATE,
443  .lib_init_fn = cpg_lib_init_fn,
444  .lib_exit_fn = cpg_lib_exit_fn,
445  .lib_engine = cpg_lib_engine,
446  .lib_engine_count = sizeof (cpg_lib_engine) / sizeof (struct corosync_lib_handler),
447  .exec_init_fn = cpg_exec_init_fn,
448  .exec_dump_fn = NULL,
449  .exec_engine = cpg_exec_engine,
450  .exec_engine_count = sizeof (cpg_exec_engine) / sizeof (struct corosync_exec_handler),
451  .sync_init = cpg_sync_init,
452  .sync_process = cpg_sync_process,
453  .sync_activate = cpg_sync_activate,
454  .sync_abort = cpg_sync_abort
455 };
456 
458 {
459  return (&cpg_service_engine);
460 }
461 
463  struct qb_ipc_request_header header __attribute__((aligned(8)));
464  mar_cpg_name_t group_name __attribute__((aligned(8)));
465  mar_uint32_t pid __attribute__((aligned(8)));
466  mar_uint32_t reason __attribute__((aligned(8)));
467 };
468 
470  struct qb_ipc_request_header header __attribute__((aligned(8)));
471  mar_cpg_name_t group_name __attribute__((aligned(8)));
472  mar_uint32_t msglen __attribute__((aligned(8)));
473  mar_uint32_t pid __attribute__((aligned(8)));
474  mar_message_source_t source __attribute__((aligned(8)));
475  mar_uint8_t message[] __attribute__((aligned(8)));
476 };
477 
479  struct qb_ipc_request_header header __attribute__((aligned(8)));
480  mar_cpg_name_t group_name __attribute__((aligned(8)));
481  mar_uint32_t msglen __attribute__((aligned(8)));
482  mar_uint32_t fraglen __attribute__((aligned(8)));
483  mar_uint32_t pid __attribute__((aligned(8)));
484  mar_uint32_t type __attribute__((aligned(8)));
485  mar_message_source_t source __attribute__((aligned(8)));
486  mar_uint8_t message[] __attribute__((aligned(8)));
487 };
488 
490  struct qb_ipc_request_header header __attribute__((aligned(8)));
491  mar_uint32_t left_nodes __attribute__((aligned(8)));
492  mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
493 };
494 
496  struct qb_ipc_request_header header __attribute__((aligned(8)));
497  /* merge decisions */
498  mar_uint32_t old_members __attribute__((aligned(8)));
499  /* downlist below */
500  mar_uint32_t left_nodes __attribute__((aligned(8)));
501  mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
502 };
503 
504 struct downlist_msg {
506  mar_uint32_t old_members __attribute__((aligned(8)));
507  mar_uint32_t left_nodes __attribute__((aligned(8)));
508  mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
509  struct list_head list;
510 };
511 
512 struct joinlist_msg {
514  uint32_t pid;
516  struct list_head list;
517 };
518 
519 static struct req_exec_cpg_downlist g_req_exec_cpg_downlist;
520 
521 /*
522  * Function print group name. It's not reentrant
523  */
524 static char *cpg_print_group_name(const mar_cpg_name_t *group)
525 {
526  static char res[CPG_MAX_NAME_LENGTH * 4 + 1];
527  int dest_pos = 0;
528  char c;
529  int i;
530 
531  for (i = 0; i < group->length; i++) {
532  c = group->value[i];
533 
534  if (c >= ' ' && c < 0x7f && c != '\\') {
535  res[dest_pos++] = c;
536  } else {
537  if (c == '\\') {
538  res[dest_pos++] = '\\';
539  res[dest_pos++] = '\\';
540  } else {
541  snprintf(res + dest_pos, sizeof(res) - dest_pos, "\\x%02X", c);
542  dest_pos += 4;
543  }
544  }
545  }
546  res[dest_pos] = 0;
547 
548  return (res);
549 }
550 
551 static void cpg_sync_init (
552  const unsigned int *trans_list,
553  size_t trans_list_entries,
554  const unsigned int *member_list,
555  size_t member_list_entries,
556  const struct memb_ring_id *ring_id)
557 {
558  int entries;
559  int i, j;
560  int found;
561 
562  my_sync_state = CPGSYNC_DOWNLIST;
563 
564  memcpy (my_member_list, member_list, member_list_entries *
565  sizeof (unsigned int));
566  my_member_list_entries = member_list_entries;
567 
568  last_sync_ring_id.nodeid = ring_id->rep.nodeid;
569  last_sync_ring_id.seq = ring_id->seq;
570 
571  downlist_state = CPG_DOWNLIST_WAITING_FOR_MESSAGES;
572 
573  entries = 0;
574  /*
575  * Determine list of nodeids for downlist message
576  */
577  for (i = 0; i < my_old_member_list_entries; i++) {
578  found = 0;
579  for (j = 0; j < trans_list_entries; j++) {
580  if (my_old_member_list[i] == trans_list[j]) {
581  found = 1;
582  break;
583  }
584  }
585  if (found == 0) {
586  g_req_exec_cpg_downlist.nodeids[entries++] =
587  my_old_member_list[i];
588  }
589  }
590  g_req_exec_cpg_downlist.left_nodes = entries;
591 }
592 
593 static int cpg_sync_process (void)
594 {
595  int res = -1;
596 
597  if (my_sync_state == CPGSYNC_DOWNLIST) {
598  res = cpg_exec_send_downlist();
599  if (res == -1) {
600  return (-1);
601  }
602  my_sync_state = CPGSYNC_JOINLIST;
603  }
604  if (my_sync_state == CPGSYNC_JOINLIST) {
605  res = cpg_exec_send_joinlist();
606  }
607  return (res);
608 }
609 
610 static void cpg_sync_activate (void)
611 {
612  memcpy (my_old_member_list, my_member_list,
613  my_member_list_entries * sizeof (unsigned int));
614  my_old_member_list_entries = my_member_list_entries;
615 
616  if (downlist_state == CPG_DOWNLIST_WAITING_FOR_MESSAGES) {
617  downlist_master_choose_and_send ();
618  }
619 
620  joinlist_inform_clients ();
621 
622  downlist_messages_delete ();
623  downlist_state = CPG_DOWNLIST_NONE;
624  joinlist_messages_delete ();
625 
626  notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
627 }
628 
629 static void cpg_sync_abort (void)
630 {
631  downlist_state = CPG_DOWNLIST_NONE;
632  downlist_messages_delete ();
633  joinlist_messages_delete ();
634 }
635 
636 static int notify_lib_totem_membership (
637  void *conn,
638  int member_list_entries,
639  const unsigned int *member_list)
640 {
641  struct list_head *iter;
642  char *buf;
643  int size;
645 
646  size = sizeof(struct res_lib_cpg_totem_confchg_callback) +
647  sizeof(mar_uint32_t) * (member_list_entries);
648  buf = alloca(size);
649  if (!buf)
650  return CS_ERR_LIBRARY;
651 
652  res = (struct res_lib_cpg_totem_confchg_callback *)buf;
653  res->member_list_entries = member_list_entries;
654  res->header.size = size;
656  res->header.error = CS_OK;
657 
658  memcpy (&res->ring_id, &last_sync_ring_id, sizeof (mar_cpg_ring_id_t));
659  memcpy (res->member_list, member_list, res->member_list_entries * sizeof (mar_uint32_t));
660 
661  if (conn == NULL) {
662  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
663  struct cpg_pd *cpg_pd = list_entry (iter, struct cpg_pd, list);
664  api->ipc_dispatch_send (cpg_pd->conn, buf, size);
665  }
666  } else {
667  api->ipc_dispatch_send (conn, buf, size);
668  }
669 
670  return CS_OK;
671 }
672 
673 static int notify_lib_joinlist(
674  const mar_cpg_name_t *group_name,
675  void *conn,
676  int joined_list_entries,
677  mar_cpg_address_t *joined_list,
678  int left_list_entries,
679  mar_cpg_address_t *left_list,
680  int id)
681 {
682  int size;
683  char *buf;
684  struct list_head *iter;
685  int count;
686  struct res_lib_cpg_confchg_callback *res;
687  mar_cpg_address_t *retgi;
688 
689  count = 0;
690 
691  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
692  struct process_info *pi = list_entry (iter, struct process_info, list);
693  if (mar_name_compare (&pi->group, group_name) == 0) {
694  int i;
695  int founded = 0;
696 
697  for (i = 0; i < left_list_entries; i++) {
698  if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) {
699  founded++;
700  }
701  }
702 
703  if (!founded)
704  count++;
705  }
706  }
707 
708  size = sizeof(struct res_lib_cpg_confchg_callback) +
709  sizeof(mar_cpg_address_t) * (count + left_list_entries + joined_list_entries);
710  buf = alloca(size);
711  if (!buf)
712  return CS_ERR_LIBRARY;
713 
714  res = (struct res_lib_cpg_confchg_callback *)buf;
715  res->joined_list_entries = joined_list_entries;
716  res->left_list_entries = left_list_entries;
717  res->member_list_entries = count;
718  retgi = res->member_list;
719  res->header.size = size;
720  res->header.id = id;
721  res->header.error = CS_OK;
722  memcpy(&res->group_name, group_name, sizeof(mar_cpg_name_t));
723 
724  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
725  struct process_info *pi=list_entry (iter, struct process_info, list);
726 
727  if (mar_name_compare (&pi->group, group_name) == 0) {
728  int i;
729  int founded = 0;
730 
731  for (i = 0;i < left_list_entries; i++) {
732  if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) {
733  founded++;
734  }
735  }
736 
737  if (!founded) {
738  retgi->nodeid = pi->nodeid;
739  retgi->pid = pi->pid;
740  retgi++;
741  }
742  }
743  }
744 
745  if (left_list_entries) {
746  memcpy (retgi, left_list, left_list_entries * sizeof(mar_cpg_address_t));
747  retgi += left_list_entries;
748  }
749 
750  if (joined_list_entries) {
751  memcpy (retgi, joined_list, joined_list_entries * sizeof(mar_cpg_address_t));
752  retgi += joined_list_entries;
753  }
754 
755  if (conn) {
756  api->ipc_dispatch_send (conn, buf, size);
757  } else {
758  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
759  struct cpg_pd *cpd = list_entry (iter, struct cpg_pd, list);
760  if (mar_name_compare (&cpd->group_name, group_name) == 0) {
761  assert (joined_list_entries <= 1);
762  if (joined_list_entries) {
763  if (joined_list[0].pid == cpd->pid &&
764  joined_list[0].nodeid == api->totem_nodeid_get()) {
766  }
767  }
768  if (cpd->cpd_state == CPD_STATE_JOIN_COMPLETED ||
770 
771  api->ipc_dispatch_send (cpd->conn, buf, size);
772  cpd->transition_counter++;
773  }
774  if (left_list_entries) {
775  if (left_list[0].pid == cpd->pid &&
776  left_list[0].nodeid == api->totem_nodeid_get() &&
777  left_list[0].reason == CONFCHG_CPG_REASON_LEAVE) {
778 
779  cpd->pid = 0;
780  memset (&cpd->group_name, 0, sizeof(cpd->group_name));
782  }
783  }
784  }
785  }
786  }
787 
788 
789  /*
790  * Traverse thru cpds and send totem membership for cpd, where it is not send yet
791  */
792  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
793  struct cpg_pd *cpd = list_entry (iter, struct cpg_pd, list);
794 
796  cpd->initial_totem_conf_sent = 1;
797 
798  notify_lib_totem_membership (cpd->conn, my_old_member_list_entries, my_old_member_list);
799  }
800  }
801 
802  return CS_OK;
803 }
804 
805 static void downlist_log(const char *msg, struct downlist_msg* dl)
806 {
807  log_printf (LOG_DEBUG,
808  "%s: sender %s; members(old:%d left:%d)",
809  msg,
811  dl->old_members,
812  dl->left_nodes);
813 }
814 
815 static struct downlist_msg* downlist_master_choose (void)
816 {
817  struct downlist_msg *cmp;
818  struct downlist_msg *best = NULL;
819  struct list_head *iter;
820  uint32_t cmp_members;
821  uint32_t best_members;
822  uint32_t i;
823  int ignore_msg;
824 
825  for (iter = downlist_messages_head.next;
826  iter != &downlist_messages_head;
827  iter = iter->next) {
828 
829  cmp = list_entry(iter, struct downlist_msg, list);
830  downlist_log("comparing", cmp);
831 
832  ignore_msg = 0;
833  for (i = 0; i < cmp->left_nodes; i++) {
834  if (cmp->nodeids[i] == api->totem_nodeid_get()) {
835  log_printf (LOG_DEBUG, "Ignoring this entry because I'm in the left list\n");
836 
837  ignore_msg = 1;
838  break;
839  }
840  }
841 
842  if (ignore_msg) {
843  continue ;
844  }
845 
846  if (best == NULL) {
847  best = cmp;
848  continue;
849  }
850 
851  best_members = best->old_members - best->left_nodes;
852  cmp_members = cmp->old_members - cmp->left_nodes;
853 
854  if (cmp_members > best_members) {
855  best = cmp;
856  } else if (cmp_members == best_members) {
857  if (cmp->old_members > best->old_members) {
858  best = cmp;
859  } else if (cmp->old_members == best->old_members) {
860  if (cmp->sender_nodeid < best->sender_nodeid) {
861  best = cmp;
862  }
863  }
864  }
865  }
866 
867  assert (best != NULL);
868 
869  return best;
870 }
871 
872 static void downlist_master_choose_and_send (void)
873 {
874  struct downlist_msg *stored_msg;
875  struct list_head *iter;
876  struct process_info *left_pi;
877  qb_map_t *group_map;
878  struct cpg_name cpg_group;
879  mar_cpg_name_t group;
880  struct confchg_data{
881  struct cpg_name cpg_group;
883  int left_list_entries;
884  struct list_head list;
885  } *pcd;
886  qb_map_iter_t *miter;
887  int i, size;
888 
889  downlist_state = CPG_DOWNLIST_APPLYING;
890 
891  stored_msg = downlist_master_choose ();
892  if (!stored_msg) {
893  log_printf (LOGSYS_LEVEL_DEBUG, "NO chosen downlist");
894  return;
895  }
896  downlist_log("chosen downlist", stored_msg);
897 
898  group_map = qb_skiplist_create();
899 
900  /*
901  * only the cpg groups included in left nodes should receive
902  * confchg event, so we will collect these cpg groups and
903  * relative left_lists here.
904  */
905  for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
906  struct process_info *pi = list_entry(iter, struct process_info, list);
907  iter = iter->next;
908 
909  left_pi = NULL;
910  for (i = 0; i < stored_msg->left_nodes; i++) {
911 
912  if (pi->nodeid == stored_msg->nodeids[i]) {
913  left_pi = pi;
914  break;
915  }
916  }
917 
918  if (left_pi) {
919  marshall_from_mar_cpg_name_t(&cpg_group, &left_pi->group);
920  cpg_group.value[cpg_group.length] = 0;
921 
922  pcd = (struct confchg_data *)qb_map_get(group_map, cpg_group.value);
923  if (pcd == NULL) {
924  pcd = (struct confchg_data *)calloc(1, sizeof(struct confchg_data));
925  memcpy(&pcd->cpg_group, &cpg_group, sizeof(struct cpg_name));
926  qb_map_put(group_map, pcd->cpg_group.value, pcd);
927  }
928  size = pcd->left_list_entries;
929  pcd->left_list[size].nodeid = left_pi->nodeid;
930  pcd->left_list[size].pid = left_pi->pid;
931  pcd->left_list[size].reason = CONFCHG_CPG_REASON_NODEDOWN;
932  pcd->left_list_entries++;
933  list_del (&left_pi->list);
934  free (left_pi);
935  }
936  }
937 
938  /* send only one confchg event per cpg group */
939  miter = qb_map_iter_create(group_map);
940  while (qb_map_iter_next(miter, (void **)&pcd)) {
941  marshall_to_mar_cpg_name_t(&group, &pcd->cpg_group);
942 
943  log_printf (LOG_DEBUG, "left_list_entries:%d", pcd->left_list_entries);
944  for (i=0; i<pcd->left_list_entries; i++) {
945  log_printf (LOG_DEBUG, "left_list[%d] group:%s, ip:%s, pid:%d",
946  i, cpg_print_group_name(&group),
947  (char*)api->totem_ifaces_print(pcd->left_list[i].nodeid),
948  pcd->left_list[i].pid);
949  }
950 
951  /* send confchg event */
952  notify_lib_joinlist(&group, NULL,
953  0, NULL,
954  pcd->left_list_entries,
955  pcd->left_list,
957 
958  free(pcd);
959  }
960  qb_map_iter_free(miter);
961  qb_map_destroy(group_map);
962 }
963 
964 /*
965  * Remove processes that might have left the group while we were suspended.
966  */
967 static void joinlist_remove_zombie_pi_entries (void)
968 {
969  struct list_head *pi_iter;
970  struct list_head *jl_iter;
971  struct process_info *pi;
972  struct joinlist_msg *stored_msg;
973  int found;
974 
975  for (pi_iter = process_info_list_head.next; pi_iter != &process_info_list_head; ) {
976  pi = list_entry (pi_iter, struct process_info, list);
977  pi_iter = pi_iter->next;
978 
979  /*
980  * Ignore local node
981  */
982  if (pi->nodeid == api->totem_nodeid_get()) {
983  continue ;
984  }
985 
986  /*
987  * Try to find message in joinlist messages
988  */
989  found = 0;
990  for (jl_iter = joinlist_messages_head.next;
991  jl_iter != &joinlist_messages_head;
992  jl_iter = jl_iter->next) {
993 
994  stored_msg = list_entry(jl_iter, struct joinlist_msg, list);
995 
996  if (stored_msg->sender_nodeid == api->totem_nodeid_get()) {
997  continue ;
998  }
999 
1000  if (pi->nodeid == stored_msg->sender_nodeid &&
1001  pi->pid == stored_msg->pid &&
1002  mar_name_compare (&pi->group, &stored_msg->group_name) == 0) {
1003  found = 1;
1004  break ;
1005  }
1006  }
1007 
1008  if (!found) {
1009  do_proc_leave(&pi->group, pi->pid, pi->nodeid, CONFCHG_CPG_REASON_PROCDOWN);
1010  }
1011  }
1012 }
1013 
1014 static void joinlist_inform_clients (void)
1015 {
1016  struct joinlist_msg *stored_msg;
1017  struct list_head *iter;
1018  unsigned int i;
1019 
1020  i = 0;
1021  for (iter = joinlist_messages_head.next;
1022  iter != &joinlist_messages_head;
1023  iter = iter->next) {
1024 
1025  stored_msg = list_entry(iter, struct joinlist_msg, list);
1026 
1027  log_printf (LOG_DEBUG, "joinlist_messages[%u] group:%s, ip:%s, pid:%d",
1028  i++, cpg_print_group_name(&stored_msg->group_name),
1029  (char*)api->totem_ifaces_print(stored_msg->sender_nodeid),
1030  stored_msg->pid);
1031 
1032  /* Ignore our own messages */
1033  if (stored_msg->sender_nodeid == api->totem_nodeid_get()) {
1034  continue ;
1035  }
1036 
1037  do_proc_join (&stored_msg->group_name, stored_msg->pid, stored_msg->sender_nodeid,
1039  }
1040 
1041  joinlist_remove_zombie_pi_entries ();
1042 }
1043 
1044 static void downlist_messages_delete (void)
1045 {
1046  struct downlist_msg *stored_msg;
1047  struct list_head *iter, *iter_next;
1048 
1049  for (iter = downlist_messages_head.next;
1050  iter != &downlist_messages_head;
1051  iter = iter_next) {
1052 
1053  iter_next = iter->next;
1054 
1055  stored_msg = list_entry(iter, struct downlist_msg, list);
1056  list_del (&stored_msg->list);
1057  free (stored_msg);
1058  }
1059 }
1060 
1061 static void joinlist_messages_delete (void)
1062 {
1063  struct joinlist_msg *stored_msg;
1064  struct list_head *iter, *iter_next;
1065 
1066  for (iter = joinlist_messages_head.next;
1067  iter != &joinlist_messages_head;
1068  iter = iter_next) {
1069 
1070  iter_next = iter->next;
1071 
1072  stored_msg = list_entry(iter, struct joinlist_msg, list);
1073  list_del (&stored_msg->list);
1074  free (stored_msg);
1075  }
1076  list_init (&joinlist_messages_head);
1077 }
1078 
1079 static char *cpg_exec_init_fn (struct corosync_api_v1 *corosync_api)
1080 {
1081  list_init (&downlist_messages_head);
1082  list_init (&joinlist_messages_head);
1083  api = corosync_api;
1084  return (NULL);
1085 }
1086 
1087 static void cpg_iteration_instance_finalize (struct cpg_iteration_instance *cpg_iteration_instance)
1088 {
1089  struct list_head *iter, *iter_next;
1090  struct process_info *pi;
1091 
1092  for (iter = cpg_iteration_instance->items_list_head.next;
1093  iter != &cpg_iteration_instance->items_list_head;
1094  iter = iter_next) {
1095 
1096  iter_next = iter->next;
1097 
1098  pi = list_entry (iter, struct process_info, list);
1099  list_del (&pi->list);
1100  free (pi);
1101  }
1102 
1103  list_del (&cpg_iteration_instance->list);
1104  hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle);
1105 }
1106 
1107 static void cpg_pd_finalize (struct cpg_pd *cpd)
1108 {
1109  struct list_head *iter, *iter_next;
1110  struct cpg_iteration_instance *cpii;
1111 
1112  zcb_all_free(cpd);
1113  for (iter = cpd->iteration_instance_list_head.next;
1114  iter != &cpd->iteration_instance_list_head;
1115  iter = iter_next) {
1116 
1117  iter_next = iter->next;
1118 
1119  cpii = list_entry (iter, struct cpg_iteration_instance, list);
1120 
1121  cpg_iteration_instance_finalize (cpii);
1122  }
1123 
1124  list_del (&cpd->list);
1125 }
1126 
1127 static int cpg_lib_exit_fn (void *conn)
1128 {
1129  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1130 
1131  log_printf(LOGSYS_LEVEL_DEBUG, "exit_fn for conn=%p", conn);
1132 
1133  if (cpd->group_name.length > 0 && cpd->cpd_state != CPD_STATE_LEAVE_STARTED) {
1134  cpg_node_joinleave_send (cpd->pid, &cpd->group_name,
1136  }
1137 
1138  cpg_pd_finalize (cpd);
1139 
1140  api->ipc_refcnt_dec (conn);
1141  return (0);
1142 }
1143 
1144 static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason)
1145 {
1147  struct iovec req_exec_cpg_iovec;
1148  int result;
1149 
1150  memcpy(&req_exec_cpg_procjoin.group_name, group_name, sizeof(mar_cpg_name_t));
1151  req_exec_cpg_procjoin.pid = pid;
1152  req_exec_cpg_procjoin.reason = reason;
1153 
1154  req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin);
1156 
1157  req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_procjoin;
1158  req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin);
1159 
1160  result = api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED);
1161 
1162  return (result);
1163 }
1164 
1165 /* Can byteswap join & leave messages */
1166 static void exec_cpg_procjoin_endian_convert (void *msg)
1167 {
1169 
1170  req_exec_cpg_procjoin->pid = swab32(req_exec_cpg_procjoin->pid);
1171  swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
1172  req_exec_cpg_procjoin->reason = swab32(req_exec_cpg_procjoin->reason);
1173 }
1174 
1175 static void exec_cpg_joinlist_endian_convert (void *msg_v)
1176 {
1177  char *msg = msg_v;
1178  struct qb_ipc_response_header *res = (struct qb_ipc_response_header *)msg;
1179  struct join_list_entry *jle = (struct join_list_entry *)(msg + sizeof(struct qb_ipc_response_header));
1180 
1181  swab_mar_int32_t (&res->size);
1182 
1183  while ((const char*)jle < msg + res->size) {
1184  jle->pid = swab32(jle->pid);
1185  swab_mar_cpg_name_t (&jle->group_name);
1186  jle++;
1187  }
1188 }
1189 
1190 static void exec_cpg_downlist_endian_convert_old (void *msg)
1191 {
1192 }
1193 
1194 static void exec_cpg_downlist_endian_convert (void *msg)
1195 {
1197  unsigned int i;
1198 
1199  req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes);
1200  req_exec_cpg_downlist->old_members = swab32(req_exec_cpg_downlist->old_members);
1201 
1202  for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
1203  req_exec_cpg_downlist->nodeids[i] = swab32(req_exec_cpg_downlist->nodeids[i]);
1204  }
1205 }
1206 
1207 
1208 static void exec_cpg_mcast_endian_convert (void *msg)
1209 {
1210  struct req_exec_cpg_mcast *req_exec_cpg_mcast = msg;
1211 
1212  swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1213  swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1214  req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
1215  req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
1216  swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1217 }
1218 
1219 static void exec_cpg_partial_mcast_endian_convert (void *msg)
1220 {
1222 
1223  swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1224  swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1225  req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
1226  req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
1227  req_exec_cpg_mcast->fraglen = swab32(req_exec_cpg_mcast->fraglen);
1228  req_exec_cpg_mcast->type = swab32(req_exec_cpg_mcast->type);
1229  swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1230 }
1231 
1232 static struct process_info *process_info_find(const mar_cpg_name_t *group_name, uint32_t pid, unsigned int nodeid) {
1233  struct list_head *iter;
1234 
1235  for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
1236  struct process_info *pi = list_entry (iter, struct process_info, list);
1237  iter = iter->next;
1238 
1239  if (pi->pid == pid && pi->nodeid == nodeid &&
1240  mar_name_compare (&pi->group, group_name) == 0) {
1241  return pi;
1242  }
1243  }
1244 
1245  return NULL;
1246 }
1247 
1248 static void do_proc_join(
1249  const mar_cpg_name_t *name,
1250  uint32_t pid,
1251  unsigned int nodeid,
1252  int reason)
1253 {
1254  struct process_info *pi;
1255  struct process_info *pi_entry;
1256  mar_cpg_address_t notify_info;
1257  struct list_head *list;
1258  struct list_head *list_to_add = NULL;
1259 
1260  if (process_info_find (name, pid, nodeid) != NULL) {
1261  return ;
1262  }
1263  pi = malloc (sizeof (struct process_info));
1264  if (!pi) {
1265  log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
1266  return;
1267  }
1268  pi->nodeid = nodeid;
1269  pi->pid = pid;
1270  memcpy(&pi->group, name, sizeof(*name));
1271  list_init(&pi->list);
1272 
1273  /*
1274  * Insert new process in sorted order so synchronization works properly
1275  */
1276  list_to_add = &process_info_list_head;
1277  for (list = process_info_list_head.next; list != &process_info_list_head; list = list->next) {
1278 
1279  pi_entry = list_entry(list, struct process_info, list);
1280  if (pi_entry->nodeid > pi->nodeid ||
1281  (pi_entry->nodeid == pi->nodeid && pi_entry->pid > pi->pid)) {
1282 
1283  break;
1284  }
1285  list_to_add = list;
1286  }
1287  list_add (&pi->list, list_to_add);
1288 
1289  notify_info.pid = pi->pid;
1290  notify_info.nodeid = nodeid;
1291  notify_info.reason = reason;
1292 
1293  notify_lib_joinlist(&pi->group, NULL,
1294  1, &notify_info,
1295  0, NULL,
1297 }
1298 
1299 static void do_proc_leave(
1300  const mar_cpg_name_t *name,
1301  uint32_t pid,
1302  unsigned int nodeid,
1303  int reason)
1304 {
1305  struct process_info *pi;
1306  struct list_head *iter;
1307  mar_cpg_address_t notify_info;
1308 
1309  notify_info.pid = pid;
1310  notify_info.nodeid = nodeid;
1311  notify_info.reason = reason;
1312 
1313  notify_lib_joinlist(name, NULL,
1314  0, NULL,
1315  1, &notify_info,
1317 
1318  for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
1319  pi = list_entry(iter, struct process_info, list);
1320  iter = iter->next;
1321 
1322  if (pi->pid == pid && pi->nodeid == nodeid &&
1323  mar_name_compare (&pi->group, name)==0) {
1324  list_del (&pi->list);
1325  free (pi);
1326  }
1327  }
1328 }
1329 
1330 static void message_handler_req_exec_cpg_downlist_old (
1331  const void *message,
1332  unsigned int nodeid)
1333 {
1334  log_printf (LOGSYS_LEVEL_WARNING, "downlist OLD from node 0x%x",
1335  nodeid);
1336 }
1337 
1338 static void message_handler_req_exec_cpg_downlist(
1339  const void *message,
1340  unsigned int nodeid)
1341 {
1342  const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message;
1343  int i;
1344  struct list_head *iter;
1345  struct downlist_msg *stored_msg;
1346  int found;
1347 
1348  if (downlist_state != CPG_DOWNLIST_WAITING_FOR_MESSAGES) {
1349  log_printf (LOGSYS_LEVEL_WARNING, "downlist left_list: %d received in state %d",
1350  req_exec_cpg_downlist->left_nodes, downlist_state);
1351  return;
1352  }
1353 
1354  stored_msg = malloc (sizeof (struct downlist_msg));
1355  stored_msg->sender_nodeid = nodeid;
1356  stored_msg->old_members = req_exec_cpg_downlist->old_members;
1357  stored_msg->left_nodes = req_exec_cpg_downlist->left_nodes;
1358  memcpy (stored_msg->nodeids, req_exec_cpg_downlist->nodeids,
1359  req_exec_cpg_downlist->left_nodes * sizeof (mar_uint32_t));
1360  list_init (&stored_msg->list);
1361  list_add (&stored_msg->list, &downlist_messages_head);
1362 
1363  for (i = 0; i < my_member_list_entries; i++) {
1364  found = 0;
1365  for (iter = downlist_messages_head.next;
1366  iter != &downlist_messages_head;
1367  iter = iter->next) {
1368 
1369  stored_msg = list_entry(iter, struct downlist_msg, list);
1370  if (my_member_list[i] == stored_msg->sender_nodeid) {
1371  found = 1;
1372  }
1373  }
1374  if (!found) {
1375  return;
1376  }
1377  }
1378 
1379  downlist_master_choose_and_send ();
1380 }
1381 
1382 
1383 static void message_handler_req_exec_cpg_procjoin (
1384  const void *message,
1385  unsigned int nodeid)
1386 {
1387  const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
1388 
1389  log_printf(LOGSYS_LEVEL_DEBUG, "got procjoin message from cluster node 0x%x (%s) for pid %u",
1390  nodeid,
1391  api->totem_ifaces_print(nodeid),
1392  (unsigned int)req_exec_cpg_procjoin->pid);
1393 
1394  do_proc_join (&req_exec_cpg_procjoin->group_name,
1395  req_exec_cpg_procjoin->pid, nodeid,
1397 }
1398 
1399 static void message_handler_req_exec_cpg_procleave (
1400  const void *message,
1401  unsigned int nodeid)
1402 {
1403  const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
1404 
1405  log_printf(LOGSYS_LEVEL_DEBUG, "got procleave message from cluster node 0x%x (%s) for pid %u",
1406  nodeid,
1407  api->totem_ifaces_print(nodeid),
1408  (unsigned int)req_exec_cpg_procjoin->pid);
1409 
1410  do_proc_leave (&req_exec_cpg_procjoin->group_name,
1411  req_exec_cpg_procjoin->pid, nodeid,
1412  req_exec_cpg_procjoin->reason);
1413 }
1414 
1415 
1416 /* Got a proclist from another node */
1417 static void message_handler_req_exec_cpg_joinlist (
1418  const void *message_v,
1419  unsigned int nodeid)
1420 {
1421  const char *message = message_v;
1422  const struct qb_ipc_response_header *res = (const struct qb_ipc_response_header *)message;
1423  const struct join_list_entry *jle = (const struct join_list_entry *)(message + sizeof(struct qb_ipc_response_header));
1424  struct joinlist_msg *stored_msg;
1425 
1426  log_printf(LOGSYS_LEVEL_DEBUG, "got joinlist message from node 0x%x",
1427  nodeid);
1428 
1429  while ((const char*)jle < message + res->size) {
1430  stored_msg = malloc (sizeof (struct joinlist_msg));
1431  memset(stored_msg, 0, sizeof (struct joinlist_msg));
1432  stored_msg->sender_nodeid = nodeid;
1433  stored_msg->pid = jle->pid;
1434  memcpy(&stored_msg->group_name, &jle->group_name, sizeof(mar_cpg_name_t));
1435  list_init (&stored_msg->list);
1436  list_add (&stored_msg->list, &joinlist_messages_head);
1437  jle++;
1438  }
1439 }
1440 
1441 static void message_handler_req_exec_cpg_mcast (
1442  const void *message,
1443  unsigned int nodeid)
1444 {
1445  const struct req_exec_cpg_mcast *req_exec_cpg_mcast = message;
1447  int msglen = req_exec_cpg_mcast->msglen;
1448  struct list_head *iter, *pi_iter;
1449  struct cpg_pd *cpd;
1450  struct iovec iovec[2];
1451  int known_node = 0;
1452 
1454  res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
1455  res_lib_cpg_mcast.msglen = msglen;
1456  res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
1457  res_lib_cpg_mcast.nodeid = nodeid;
1458 
1459  memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
1460  sizeof(mar_cpg_name_t));
1461  iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
1462  iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
1463 
1464  iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
1465  iovec[1].iov_len = msglen;
1466 
1467  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) {
1468  cpd = list_entry(iter, struct cpg_pd, list);
1469  iter = iter->next;
1470 
1472  && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1473 
1474  if (!known_node) {
1475  /* Try to find, if we know the node */
1476  for (pi_iter = process_info_list_head.next;
1477  pi_iter != &process_info_list_head; pi_iter = pi_iter->next) {
1478 
1479  struct process_info *pi = list_entry (pi_iter, struct process_info, list);
1480 
1481  if (pi->nodeid == nodeid &&
1482  mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
1483  known_node = 1;
1484  break;
1485  }
1486  }
1487  }
1488 
1489  if (!known_node) {
1490  log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
1491  return ;
1492  }
1493 
1494  api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
1495  }
1496  }
1497 }
1498 
1499 static void message_handler_req_exec_cpg_partial_mcast (
1500  const void *message,
1501  unsigned int nodeid)
1502 {
1503  const struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = message;
1505  int msglen = req_exec_cpg_mcast->fraglen;
1506  struct list_head *iter, *pi_iter;
1507  struct cpg_pd *cpd;
1508  struct iovec iovec[2];
1509  int known_node = 0;
1510 
1511  log_printf(LOGSYS_LEVEL_DEBUG, "Got fragmented message from node %d, size = %d bytes\n", nodeid, msglen);
1512 
1514  res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
1515  res_lib_cpg_mcast.fraglen = msglen;
1516  res_lib_cpg_mcast.msglen = req_exec_cpg_mcast->msglen;
1517  res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
1518  res_lib_cpg_mcast.type = req_exec_cpg_mcast->type;
1519  res_lib_cpg_mcast.nodeid = nodeid;
1520 
1521  memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
1522  sizeof(mar_cpg_name_t));
1523  iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
1524  iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
1525 
1526  iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
1527  iovec[1].iov_len = msglen;
1528 
1529  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) {
1530  cpd = list_entry(iter, struct cpg_pd, list);
1531  iter = iter->next;
1532 
1534  && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1535 
1536  if (!known_node) {
1537  /* Try to find, if we know the node */
1538  for (pi_iter = process_info_list_head.next;
1539  pi_iter != &process_info_list_head; pi_iter = pi_iter->next) {
1540 
1541  struct process_info *pi = list_entry (pi_iter, struct process_info, list);
1542 
1543  if (pi->nodeid == nodeid &&
1544  mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
1545  known_node = 1;
1546  break;
1547  }
1548  }
1549  }
1550 
1551  if (!known_node) {
1552  log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
1553  return ;
1554  }
1555 
1556  api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
1557  }
1558  }
1559 }
1560 
1561 
1562 static int cpg_exec_send_downlist(void)
1563 {
1564  struct iovec iov;
1565 
1566  g_req_exec_cpg_downlist.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_DOWNLIST);
1567  g_req_exec_cpg_downlist.header.size = sizeof(struct req_exec_cpg_downlist);
1568 
1569  g_req_exec_cpg_downlist.old_members = my_old_member_list_entries;
1570 
1571  iov.iov_base = (void *)&g_req_exec_cpg_downlist;
1572  iov.iov_len = g_req_exec_cpg_downlist.header.size;
1573 
1574  return (api->totem_mcast (&iov, 1, TOTEM_AGREED));
1575 }
1576 
1577 static int cpg_exec_send_joinlist(void)
1578 {
1579  int count = 0;
1580  struct list_head *iter;
1581  struct qb_ipc_response_header *res;
1582  char *buf;
1583  struct join_list_entry *jle;
1584  struct iovec req_exec_cpg_iovec;
1585 
1586  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
1587  struct process_info *pi = list_entry (iter, struct process_info, list);
1588 
1589  if (pi->nodeid == api->totem_nodeid_get ()) {
1590  count++;
1591  }
1592  }
1593 
1594  /* Nothing to send */
1595  if (!count)
1596  return 0;
1597 
1598  buf = alloca(sizeof(struct qb_ipc_response_header) + sizeof(struct join_list_entry) * count);
1599  if (!buf) {
1600  log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate joinlist buffer");
1601  return -1;
1602  }
1603 
1604  jle = (struct join_list_entry *)(buf + sizeof(struct qb_ipc_response_header));
1605  res = (struct qb_ipc_response_header *)buf;
1606 
1607  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
1608  struct process_info *pi = list_entry (iter, struct process_info, list);
1609 
1610  if (pi->nodeid == api->totem_nodeid_get ()) {
1611  memcpy (&jle->group_name, &pi->group, sizeof (mar_cpg_name_t));
1612  jle->pid = pi->pid;
1613  jle++;
1614  }
1615  }
1616 
1618  res->size = sizeof(struct qb_ipc_response_header)+sizeof(struct join_list_entry) * count;
1619 
1620  req_exec_cpg_iovec.iov_base = buf;
1621  req_exec_cpg_iovec.iov_len = res->size;
1622 
1623  return (api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED));
1624 }
1625 
1626 static int cpg_lib_init_fn (void *conn)
1627 {
1628  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1629  memset (cpd, 0, sizeof(struct cpg_pd));
1630  cpd->conn = conn;
1631  list_add (&cpd->list, &cpg_pd_list_head);
1632 
1633  list_init (&cpd->iteration_instance_list_head);
1634  list_init (&cpd->zcb_mapped_list_head);
1635 
1636  api->ipc_refcnt_inc (conn);
1637  log_printf(LOGSYS_LEVEL_DEBUG, "lib_init_fn: conn=%p, cpd=%p", conn, cpd);
1638  return (0);
1639 }
1640 
1641 /* Join message from the library */
1642 static void message_handler_req_lib_cpg_join (void *conn, const void *message)
1643 {
1644  const struct req_lib_cpg_join *req_lib_cpg_join = message;
1645  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1647  cs_error_t error = CS_OK;
1648  struct list_head *iter;
1649 
1650  /* Test, if we don't have same pid and group name joined */
1651  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
1652  struct cpg_pd *cpd_item = list_entry (iter, struct cpg_pd, list);
1653 
1654  if (cpd_item->pid == req_lib_cpg_join->pid &&
1655  mar_name_compare(&req_lib_cpg_join->group_name, &cpd_item->group_name) == 0) {
1656 
1657  /* We have same pid and group name joined -> return error */
1658  error = CS_ERR_EXIST;
1659  goto response_send;
1660  }
1661  }
1662 
1663  /*
1664  * Same check must be done in process info list, because there may be not yet delivered
1665  * leave of client.
1666  */
1667  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
1668  struct process_info *pi = list_entry (iter, struct process_info, list);
1669 
1670  if (pi->nodeid == api->totem_nodeid_get () && pi->pid == req_lib_cpg_join->pid &&
1671  mar_name_compare(&req_lib_cpg_join->group_name, &pi->group) == 0) {
1672  /* We have same pid and group name joined -> return error */
1673  error = CS_ERR_TRY_AGAIN;
1674  goto response_send;
1675  }
1676  }
1677 
1678  if (req_lib_cpg_join->group_name.length > CPG_MAX_NAME_LENGTH) {
1679  error = CS_ERR_NAME_TOO_LONG;
1680  goto response_send;
1681  }
1682 
1683  switch (cpd->cpd_state) {
1684  case CPD_STATE_UNJOINED:
1685  error = CS_OK;
1687  cpd->pid = req_lib_cpg_join->pid;
1688  cpd->flags = req_lib_cpg_join->flags;
1689  memcpy (&cpd->group_name, &req_lib_cpg_join->group_name,
1690  sizeof (cpd->group_name));
1691 
1692  cpg_node_joinleave_send (req_lib_cpg_join->pid,
1693  &req_lib_cpg_join->group_name,
1695  break;
1697  error = CS_ERR_BUSY;
1698  break;
1700  error = CS_ERR_EXIST;
1701  break;
1703  error = CS_ERR_EXIST;
1704  break;
1705  }
1706 
1707 response_send:
1708  res_lib_cpg_join.header.size = sizeof(res_lib_cpg_join);
1710  res_lib_cpg_join.header.error = error;
1711  api->ipc_response_send (conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join));
1712 }
1713 
1714 /* Leave message from the library */
1715 static void message_handler_req_lib_cpg_leave (void *conn, const void *message)
1716 {
1718  cs_error_t error = CS_OK;
1719  struct req_lib_cpg_leave *req_lib_cpg_leave = (struct req_lib_cpg_leave *)message;
1720  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1721 
1722  log_printf(LOGSYS_LEVEL_DEBUG, "got leave request on %p", conn);
1723 
1724  switch (cpd->cpd_state) {
1725  case CPD_STATE_UNJOINED:
1726  error = CS_ERR_NOT_EXIST;
1727  break;
1729  error = CS_ERR_NOT_EXIST;
1730  break;
1732  error = CS_ERR_BUSY;
1733  break;
1735  error = CS_OK;
1737  cpg_node_joinleave_send (req_lib_cpg_leave->pid,
1738  &req_lib_cpg_leave->group_name,
1741  break;
1742  }
1743 
1744  /* send return */
1745  res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave);
1747  res_lib_cpg_leave.header.error = error;
1749 }
1750 
1751 /* Finalize message from library */
1752 static void message_handler_req_lib_cpg_finalize (
1753  void *conn,
1754  const void *message)
1755 {
1756  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1758  cs_error_t error = CS_OK;
1759 
1760  log_printf (LOGSYS_LEVEL_DEBUG, "cpg finalize for conn=%p", conn);
1761 
1762  /*
1763  * We will just remove cpd from list. After this call, connection will be
1764  * closed on lib side, and cpg_lib_exit_fn will be called
1765  */
1766  list_del (&cpd->list);
1767  list_init (&cpd->list);
1768 
1769  res_lib_cpg_finalize.header.size = sizeof (res_lib_cpg_finalize);
1771  res_lib_cpg_finalize.header.error = error;
1772 
1774  sizeof (res_lib_cpg_finalize));
1775 }
1776 
1777 static int
1778 memory_map (
1779  const char *path,
1780  size_t bytes,
1781  void **buf)
1782 {
1783  int32_t fd;
1784  void *addr;
1785  int32_t res;
1786 
1787  fd = open (path, O_RDWR, 0600);
1788 
1789  unlink (path);
1790 
1791  if (fd == -1) {
1792  return (-1);
1793  }
1794 
1795  res = ftruncate (fd, bytes);
1796  if (res == -1) {
1797  goto error_close_unlink;
1798  }
1799 
1800  addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
1801  MAP_SHARED, fd, 0);
1802 
1803  if (addr == MAP_FAILED) {
1804  goto error_close_unlink;
1805  }
1806 #ifdef MADV_NOSYNC
1807  madvise(addr, bytes, MADV_NOSYNC);
1808 #endif
1809 
1810  res = close (fd);
1811  if (res) {
1812  return (-1);
1813  }
1814  *buf = addr;
1815  return (0);
1816 
1817 error_close_unlink:
1818  close (fd);
1819  unlink(path);
1820  return -1;
1821 }
1822 
1823 static inline int zcb_alloc (
1824  struct cpg_pd *cpd,
1825  const char *path_to_file,
1826  size_t size,
1827  void **addr)
1828 {
1829  struct zcb_mapped *zcb_mapped;
1830  unsigned int res;
1831 
1832  zcb_mapped = malloc (sizeof (struct zcb_mapped));
1833  if (zcb_mapped == NULL) {
1834  return (-1);
1835  }
1836 
1837  res = memory_map (
1838  path_to_file,
1839  size,
1840  addr);
1841  if (res == -1) {
1842  free (zcb_mapped);
1843  return (-1);
1844  }
1845 
1846  list_init (&zcb_mapped->list);
1847  zcb_mapped->addr = *addr;
1848  zcb_mapped->size = size;
1849  list_add_tail (&zcb_mapped->list, &cpd->zcb_mapped_list_head);
1850  return (0);
1851 }
1852 
1853 
1854 static inline int zcb_free (struct zcb_mapped *zcb_mapped)
1855 {
1856  unsigned int res;
1857 
1858  res = munmap (zcb_mapped->addr, zcb_mapped->size);
1859  list_del (&zcb_mapped->list);
1860  free (zcb_mapped);
1861  return (res);
1862 }
1863 
1864 static inline int zcb_by_addr_free (struct cpg_pd *cpd, void *addr)
1865 {
1866  struct list_head *list;
1867  struct zcb_mapped *zcb_mapped;
1868  unsigned int res = 0;
1869 
1870  for (list = cpd->zcb_mapped_list_head.next;
1871  list != &cpd->zcb_mapped_list_head; list = list->next) {
1872 
1873  zcb_mapped = list_entry (list, struct zcb_mapped, list);
1874 
1875  if (zcb_mapped->addr == addr) {
1876  res = zcb_free (zcb_mapped);
1877  break;
1878  }
1879 
1880  }
1881  return (res);
1882 }
1883 
1884 static inline int zcb_all_free (
1885  struct cpg_pd *cpd)
1886 {
1887  struct list_head *list;
1888  struct zcb_mapped *zcb_mapped;
1889 
1890  for (list = cpd->zcb_mapped_list_head.next;
1891  list != &cpd->zcb_mapped_list_head;) {
1892 
1893  zcb_mapped = list_entry (list, struct zcb_mapped, list);
1894 
1895  list = list->next;
1896 
1897  zcb_free (zcb_mapped);
1898  }
1899  return (0);
1900 }
1901 
1902 union u {
1903  uint64_t server_addr;
1904  void *server_ptr;
1905 };
1906 
1907 static uint64_t void2serveraddr (void *server_ptr)
1908 {
1909  union u u;
1910 
1911  u.server_ptr = server_ptr;
1912  return (u.server_addr);
1913 }
1914 
1915 static void *serveraddr2void (uint64_t server_addr)
1916 {
1917  union u u;
1918 
1920  return (u.server_ptr);
1921 };
1922 
1923 static void message_handler_req_lib_cpg_zc_alloc (
1924  void *conn,
1925  const void *message)
1926 {
1928  struct qb_ipc_response_header res_header;
1929  void *addr = NULL;
1930  struct coroipcs_zc_header *zc_header;
1931  unsigned int res;
1932  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1933 
1934  log_printf(LOGSYS_LEVEL_DEBUG, "path: %s", hdr->path_to_file);
1935 
1936  res = zcb_alloc (cpd, hdr->path_to_file, hdr->map_size,
1937  &addr);
1938  assert(res == 0);
1939 
1940  zc_header = (struct coroipcs_zc_header *)addr;
1941  zc_header->server_address = void2serveraddr(addr);
1942 
1943  res_header.size = sizeof (struct qb_ipc_response_header);
1944  res_header.id = 0;
1945  api->ipc_response_send (conn,
1946  &res_header,
1947  res_header.size);
1948 }
1949 
1950 static void message_handler_req_lib_cpg_zc_free (
1951  void *conn,
1952  const void *message)
1953 {
1955  struct qb_ipc_response_header res_header;
1956  void *addr = NULL;
1957  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1958 
1959  log_printf(LOGSYS_LEVEL_DEBUG, " free'ing");
1960 
1961  addr = serveraddr2void (hdr->server_address);
1962 
1963  zcb_by_addr_free (cpd, addr);
1964 
1965  res_header.size = sizeof (struct qb_ipc_response_header);
1966  res_header.id = 0;
1967  api->ipc_response_send (
1968  conn, &res_header,
1969  res_header.size);
1970 }
1971 
1972 /* Fragmented mcast message from the library */
1973 static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message)
1974 {
1975  const struct req_lib_cpg_partial_mcast *req_lib_cpg_mcast = message;
1976  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1977  mar_cpg_name_t group_name = cpd->group_name;
1978 
1979  struct iovec req_exec_cpg_iovec[2];
1980  struct req_exec_cpg_partial_mcast req_exec_cpg_mcast;
1981  struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
1982  int msglen = req_lib_cpg_mcast->fraglen;
1983  int result;
1984  cs_error_t error = CS_ERR_NOT_EXIST;
1985 
1986  log_printf(LOGSYS_LEVEL_TRACE, "got fragmented mcast request on %p", conn);
1987  log_printf(LOGSYS_LEVEL_DEBUG, "Sending fragmented message size = %d bytes\n", msglen);
1988 
1989  switch (cpd->cpd_state) {
1990  case CPD_STATE_UNJOINED:
1991  error = CS_ERR_NOT_EXIST;
1992  break;
1994  error = CS_ERR_NOT_EXIST;
1995  break;
1997  error = CS_OK;
1998  break;
2000  error = CS_OK;
2001  break;
2002  }
2003 
2004  res_lib_cpg_partial_send.header.size = sizeof(res_lib_cpg_partial_send);
2005  res_lib_cpg_partial_send.header.id = MESSAGE_RES_CPG_PARTIAL_SEND;
2006 
2007  if (req_lib_cpg_mcast->type == LIBCPG_PARTIAL_FIRST) {
2009  }
2011  error = CS_ERR_INTERRUPT;
2012  }
2013 
2014  if (error == CS_OK) {
2015  req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
2016  req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
2018  req_exec_cpg_mcast.pid = cpd->pid;
2019  req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
2020  req_exec_cpg_mcast.type = req_lib_cpg_mcast->type;
2021  req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen;
2022  api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
2023  memcpy(&req_exec_cpg_mcast.group_name, &group_name,
2024  sizeof(mar_cpg_name_t));
2025 
2026  req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
2027  req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
2028  req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
2029  req_exec_cpg_iovec[1].iov_len = msglen;
2030 
2031  result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
2032  assert(result == 0);
2033  } else {
2034  log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
2035  conn, group_name.value, cpd->cpd_state, error);
2036  }
2037 
2038  res_lib_cpg_partial_send.header.error = error;
2039  api->ipc_response_send (conn, &res_lib_cpg_partial_send,
2040  sizeof (res_lib_cpg_partial_send));
2041 }
2042 
2043 /* Mcast message from the library */
2044 static void message_handler_req_lib_cpg_mcast (void *conn, const void *message)
2045 {
2046  const struct req_lib_cpg_mcast *req_lib_cpg_mcast = message;
2047  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
2048  mar_cpg_name_t group_name = cpd->group_name;
2049 
2050  struct iovec req_exec_cpg_iovec[2];
2051  struct req_exec_cpg_mcast req_exec_cpg_mcast;
2052  int msglen = req_lib_cpg_mcast->msglen;
2053  int result;
2054  cs_error_t error = CS_ERR_NOT_EXIST;
2055 
2056  log_printf(LOGSYS_LEVEL_TRACE, "got mcast request on %p", conn);
2057 
2058  switch (cpd->cpd_state) {
2059  case CPD_STATE_UNJOINED:
2060  error = CS_ERR_NOT_EXIST;
2061  break;
2063  error = CS_ERR_NOT_EXIST;
2064  break;
2066  error = CS_OK;
2067  break;
2069  error = CS_OK;
2070  break;
2071  }
2072 
2073  if (error == CS_OK) {
2074  req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
2075  req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
2077  req_exec_cpg_mcast.pid = cpd->pid;
2078  req_exec_cpg_mcast.msglen = msglen;
2079  api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
2080  memcpy(&req_exec_cpg_mcast.group_name, &group_name,
2081  sizeof(mar_cpg_name_t));
2082 
2083  req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
2084  req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
2085  req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
2086  req_exec_cpg_iovec[1].iov_len = msglen;
2087 
2088  result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
2089  assert(result == 0);
2090  } else {
2091  log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
2092  conn, group_name.value, cpd->cpd_state, error);
2093  }
2094 }
2095 
2096 static void message_handler_req_lib_cpg_zc_execute (
2097  void *conn,
2098  const void *message)
2099 {
2101  struct qb_ipc_request_header *header;
2102  struct res_lib_cpg_mcast res_lib_cpg_mcast;
2103  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
2104  struct iovec req_exec_cpg_iovec[2];
2105  struct req_exec_cpg_mcast req_exec_cpg_mcast;
2107  int result;
2108  cs_error_t error = CS_ERR_NOT_EXIST;
2109 
2110  log_printf(LOGSYS_LEVEL_TRACE, "got ZC mcast request on %p", conn);
2111 
2112  header = (struct qb_ipc_request_header *)(((char *)serveraddr2void(hdr->server_address) + sizeof (struct coroipcs_zc_header)));
2113  req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)header;
2114 
2115  switch (cpd->cpd_state) {
2116  case CPD_STATE_UNJOINED:
2117  error = CS_ERR_NOT_EXIST;
2118  break;
2120  error = CS_ERR_NOT_EXIST;
2121  break;
2123  error = CS_OK;
2124  break;
2126  error = CS_OK;
2127  break;
2128  }
2129 
2130  res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast);
2131  res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST;
2132  if (error == CS_OK) {
2133  req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + req_lib_cpg_mcast->msglen;
2134  req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
2136  req_exec_cpg_mcast.pid = cpd->pid;
2137  req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
2138  api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
2139  memcpy(&req_exec_cpg_mcast.group_name, &cpd->group_name,
2140  sizeof(mar_cpg_name_t));
2141 
2142  req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
2143  req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
2144  req_exec_cpg_iovec[1].iov_base = (char *)header + sizeof(struct req_lib_cpg_mcast);
2145  req_exec_cpg_iovec[1].iov_len = req_exec_cpg_mcast.msglen;
2146 
2147  result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
2148  if (result == 0) {
2149  res_lib_cpg_mcast.header.error = CS_OK;
2150  } else {
2151  res_lib_cpg_mcast.header.error = CS_ERR_TRY_AGAIN;
2152  }
2153  } else {
2154  res_lib_cpg_mcast.header.error = error;
2155  }
2156 
2157  api->ipc_response_send (conn, &res_lib_cpg_mcast,
2158  sizeof (res_lib_cpg_mcast));
2159 
2160 }
2161 
2162 static void message_handler_req_lib_cpg_membership (void *conn,
2163  const void *message)
2164 {
2166  (struct req_lib_cpg_membership_get *)message;
2167  struct res_lib_cpg_membership_get res_lib_cpg_membership_get;
2168  struct list_head *iter;
2169  int member_count = 0;
2170 
2171  res_lib_cpg_membership_get.header.id = MESSAGE_RES_CPG_MEMBERSHIP;
2172  res_lib_cpg_membership_get.header.error = CS_OK;
2173  res_lib_cpg_membership_get.header.size =
2174  sizeof (struct res_lib_cpg_membership_get);
2175 
2176  for (iter = process_info_list_head.next;
2177  iter != &process_info_list_head; iter = iter->next) {
2178 
2179  struct process_info *pi = list_entry (iter, struct process_info, list);
2180  if (mar_name_compare (&pi->group, &req_lib_cpg_membership_get->group_name) == 0) {
2181  res_lib_cpg_membership_get.member_list[member_count].nodeid = pi->nodeid;
2182  res_lib_cpg_membership_get.member_list[member_count].pid = pi->pid;
2183  member_count += 1;
2184  }
2185  }
2186  res_lib_cpg_membership_get.member_count = member_count;
2187 
2188  api->ipc_response_send (conn, &res_lib_cpg_membership_get,
2189  sizeof (res_lib_cpg_membership_get));
2190 }
2191 
2192 static void message_handler_req_lib_cpg_local_get (void *conn,
2193  const void *message)
2194 {
2195  struct res_lib_cpg_local_get res_lib_cpg_local_get;
2196 
2197  res_lib_cpg_local_get.header.size = sizeof (res_lib_cpg_local_get);
2198  res_lib_cpg_local_get.header.id = MESSAGE_RES_CPG_LOCAL_GET;
2199  res_lib_cpg_local_get.header.error = CS_OK;
2200  res_lib_cpg_local_get.local_nodeid = api->totem_nodeid_get ();
2201 
2202  api->ipc_response_send (conn, &res_lib_cpg_local_get,
2203  sizeof (res_lib_cpg_local_get));
2204 }
2205 
2206 static void message_handler_req_lib_cpg_iteration_initialize (
2207  void *conn,
2208  const void *message)
2209 {
2211  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
2212  hdb_handle_t cpg_iteration_handle = 0;
2213  struct res_lib_cpg_iterationinitialize res_lib_cpg_iterationinitialize;
2214  struct list_head *iter, *iter2;
2215  struct cpg_iteration_instance *cpg_iteration_instance;
2216  cs_error_t error = CS_OK;
2217  int res;
2218 
2219  log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration initialize");
2220 
2221  /* Because between calling this function and *next can be some operations which will
2222  * change list, we must do full copy.
2223  */
2224 
2225  /*
2226  * Create new iteration instance
2227  */
2228  res = hdb_handle_create (&cpg_iteration_handle_t_db, sizeof (struct cpg_iteration_instance),
2229  &cpg_iteration_handle);
2230 
2231  if (res != 0) {
2232  error = CS_ERR_NO_MEMORY;
2233  goto response_send;
2234  }
2235 
2236  res = hdb_handle_get (&cpg_iteration_handle_t_db, cpg_iteration_handle, (void *)&cpg_iteration_instance);
2237 
2238  if (res != 0) {
2239  error = CS_ERR_BAD_HANDLE;
2240  goto error_destroy;
2241  }
2242 
2243  list_init (&cpg_iteration_instance->items_list_head);
2244  cpg_iteration_instance->handle = cpg_iteration_handle;
2245 
2246  /*
2247  * Create copy of process_info list "grouped by" group name
2248  */
2249  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
2250  struct process_info *pi = list_entry (iter, struct process_info, list);
2251  struct process_info *new_pi;
2252 
2253  if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_NAME_ONLY) {
2254  /*
2255  * Try to find processed group name in our list new list
2256  */
2257  int found = 0;
2258 
2259  for (iter2 = cpg_iteration_instance->items_list_head.next;
2260  iter2 != &cpg_iteration_instance->items_list_head;
2261  iter2 = iter2->next) {
2262  struct process_info *pi2 = list_entry (iter2, struct process_info, list);
2263 
2264  if (mar_name_compare (&pi2->group, &pi->group) == 0) {
2265  found = 1;
2266  break;
2267  }
2268  }
2269 
2270  if (found) {
2271  /*
2272  * We have this name in list -> don't add
2273  */
2274  continue ;
2275  }
2276  } else if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_ONE_GROUP) {
2277  /*
2278  * Test pi group name with request
2279  */
2280  if (mar_name_compare (&pi->group, &req_lib_cpg_iterationinitialize->group_name) != 0)
2281  /*
2282  * Not same -> don't add
2283  */
2284  continue ;
2285  }
2286 
2287  new_pi = malloc (sizeof (struct process_info));
2288  if (!new_pi) {
2289  log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
2290 
2291  error = CS_ERR_NO_MEMORY;
2292 
2293  goto error_put_destroy;
2294  }
2295 
2296  memcpy (new_pi, pi, sizeof (struct process_info));
2297  list_init (&new_pi->list);
2298 
2299  if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_NAME_ONLY) {
2300  /*
2301  * pid and nodeid -> undefined
2302  */
2303  new_pi->pid = new_pi->nodeid = 0;
2304  }
2305 
2306  /*
2307  * We will return list "grouped" by "group name", so try to find right place to add
2308  */
2309  for (iter2 = cpg_iteration_instance->items_list_head.next;
2310  iter2 != &cpg_iteration_instance->items_list_head;
2311  iter2 = iter2->next) {
2312  struct process_info *pi2 = list_entry (iter2, struct process_info, list);
2313 
2314  if (mar_name_compare (&pi2->group, &pi->group) == 0) {
2315  break;
2316  }
2317  }
2318 
2319  list_add (&new_pi->list, iter2);
2320  }
2321 
2322  /*
2323  * Now we have a full "grouped by" copy of process_info list
2324  */
2325 
2326  /*
2327  * Add instance to current cpd list
2328  */
2329  list_init (&cpg_iteration_instance->list);
2330  list_add (&cpg_iteration_instance->list, &cpd->iteration_instance_list_head);
2331 
2332  cpg_iteration_instance->current_pointer = &cpg_iteration_instance->items_list_head;
2333 
2334 error_put_destroy:
2335  hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2336 error_destroy:
2337  if (error != CS_OK) {
2338  hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2339  }
2340 
2341 response_send:
2342  res_lib_cpg_iterationinitialize.header.size = sizeof (res_lib_cpg_iterationinitialize);
2343  res_lib_cpg_iterationinitialize.header.id = MESSAGE_RES_CPG_ITERATIONINITIALIZE;
2344  res_lib_cpg_iterationinitialize.header.error = error;
2345  res_lib_cpg_iterationinitialize.iteration_handle = cpg_iteration_handle;
2346 
2347  api->ipc_response_send (conn, &res_lib_cpg_iterationinitialize,
2348  sizeof (res_lib_cpg_iterationinitialize));
2349 }
2350 
2351 static void message_handler_req_lib_cpg_iteration_next (
2352  void *conn,
2353  const void *message)
2354 {
2355  const struct req_lib_cpg_iterationnext *req_lib_cpg_iterationnext = message;
2356  struct res_lib_cpg_iterationnext res_lib_cpg_iterationnext;
2357  struct cpg_iteration_instance *cpg_iteration_instance;
2358  cs_error_t error = CS_OK;
2359  int res;
2360  struct process_info *pi;
2361 
2362  log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration next");
2363 
2364  res = hdb_handle_get (&cpg_iteration_handle_t_db,
2365  req_lib_cpg_iterationnext->iteration_handle,
2366  (void *)&cpg_iteration_instance);
2367 
2368  if (res != 0) {
2369  error = CS_ERR_LIBRARY;
2370  goto error_exit;
2371  }
2372 
2373  assert (cpg_iteration_instance);
2374 
2375  cpg_iteration_instance->current_pointer = cpg_iteration_instance->current_pointer->next;
2376 
2377  if (cpg_iteration_instance->current_pointer == &cpg_iteration_instance->items_list_head) {
2378  error = CS_ERR_NO_SECTIONS;
2379  goto error_put;
2380  }
2381 
2382  pi = list_entry (cpg_iteration_instance->current_pointer, struct process_info, list);
2383 
2384  /*
2385  * Copy iteration data
2386  */
2387  res_lib_cpg_iterationnext.description.nodeid = pi->nodeid;
2388  res_lib_cpg_iterationnext.description.pid = pi->pid;
2389  memcpy (&res_lib_cpg_iterationnext.description.group,
2390  &pi->group,
2391  sizeof (mar_cpg_name_t));
2392 
2393 error_put:
2394  hdb_handle_put (&cpg_iteration_handle_t_db, req_lib_cpg_iterationnext->iteration_handle);
2395 error_exit:
2396  res_lib_cpg_iterationnext.header.size = sizeof (res_lib_cpg_iterationnext);
2397  res_lib_cpg_iterationnext.header.id = MESSAGE_RES_CPG_ITERATIONNEXT;
2398  res_lib_cpg_iterationnext.header.error = error;
2399 
2400  api->ipc_response_send (conn, &res_lib_cpg_iterationnext,
2401  sizeof (res_lib_cpg_iterationnext));
2402 }
2403 
2404 static void message_handler_req_lib_cpg_iteration_finalize (
2405  void *conn,
2406  const void *message)
2407 {
2409  struct res_lib_cpg_iterationfinalize res_lib_cpg_iterationfinalize;
2410  struct cpg_iteration_instance *cpg_iteration_instance;
2411  cs_error_t error = CS_OK;
2412  int res;
2413 
2414  log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration finalize");
2415 
2416  res = hdb_handle_get (&cpg_iteration_handle_t_db,
2417  req_lib_cpg_iterationfinalize->iteration_handle,
2418  (void *)&cpg_iteration_instance);
2419 
2420  if (res != 0) {
2421  error = CS_ERR_LIBRARY;
2422  goto error_exit;
2423  }
2424 
2425  assert (cpg_iteration_instance);
2426 
2427  cpg_iteration_instance_finalize (cpg_iteration_instance);
2428  hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle);
2429 
2430 error_exit:
2431  res_lib_cpg_iterationfinalize.header.size = sizeof (res_lib_cpg_iterationfinalize);
2432  res_lib_cpg_iterationfinalize.header.id = MESSAGE_RES_CPG_ITERATIONFINALIZE;
2433  res_lib_cpg_iterationfinalize.header.error = error;
2434 
2435  api->ipc_response_send (conn, &res_lib_cpg_iterationfinalize,
2436  sizeof (res_lib_cpg_iterationfinalize));
2437 }
void *(* ipc_private_data_get)(void *conn)
Definition: coroapi.h:208
#define TOTEM_AGREED
Definition: coroapi.h:89
int initial_totem_conf_sent
Definition: exec/cpg.c:159
const char * name
Definition: coroapi.h:432
Definition: exec/cpg.c:1902
mar_cpg_address_t member_list[]
Definition: ipc_cpg.h:281
struct list_head list
Definition: exec/cpg.c:509
mar_uint32_t sender_nodeid
Definition: exec/cpg.c:505
#define CPG_MAX_NAME_LENGTH
Definition: cpg.h:91
uint64_t initial_transition_counter
Definition: exec/cpg.c:161
#define LOGSYS_LEVEL_TRACE
Definition: logsys.h:75
mar_uint32_t sender_nodeid
Definition: exec/cpg.c:513
#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
Definition: cpg.h:144
Definition: cpg.h:92
struct list_head * next
Definition: list.h:47
struct list_head list
Definition: exec/cpg.c:169
int(* ipc_dispatch_iov_send)(void *conn, const struct iovec *iov, unsigned int iov_len)
Definition: coroapi.h:217
int(* ipc_response_send)(void *conn, const void *msg, size_t mlen)
Definition: coroapi.h:210
struct corosync_service_engine * cpg_get_service_engine_ver0(void)
Definition: exec/cpg.c:457
cpg_sync_state
Definition: exec/cpg.c:139
mar_cpg_name_t group
Definition: exec/cpg.c:195
struct message_header header
Definition: totemsrp.c:60
struct list_head * current_pointer
Definition: exec/cpg.c:171
hdb_handle_t handle
Definition: exec/cpg.c:168
uint32_t pid
Definition: exec/cpg.c:194
#define CPG_MEMBERS_MAX
Definition: cpg.h:97
int(* totem_mcast)(const struct iovec *iovec, unsigned int iov_len, unsigned int guarantee)
Definition: coroapi.h:233
uint64_t transition_counter
Definition: exec/cpg.c:160
Definition: list.h:46
#define log_printf(level, format, args...)
Definition: logsys.h:217
struct list_head list
Definition: exec/cpg.c:196
void(* exec_handler_fn)(const void *msg, unsigned int nodeid)
Definition: coroapi.h:423
uint64_t server_address
Definition: ipc_cpg.h:352
void * conn
Definition: exec/cpg.c:154
struct list_head iteration_instance_list_head
Definition: exec/cpg.c:163
#define SERVICE_ID_MAKE(a, b)
Definition: coroapi.h:411
#define LOGSYS_LEVEL_WARNING
Definition: logsys.h:71
struct list_head list
Definition: exec/cpg.c:516
unsigned int flags
Definition: exec/cpg.c:158
unsigned int(* totem_nodeid_get)(void)
Definition: coroapi.h:227
uint32_t pid
Definition: exec/cpg.c:514
unsigned int nodeid
Definition: coroapi.h:96
void(* ipc_refcnt_dec)(void *conn)
Definition: coroapi.h:222
struct list_head list
Definition: exec/cpg.c:91
#define LOGSYS_LEVEL_ERROR
Definition: logsys.h:70
size_t size
Definition: exec/cpg.c:93
Linked list API.
void * server_ptr
Definition: exec/cpg.c:1904
struct totem_ip_address rep
Definition: coroapi.h:104
uint32_t pid
Definition: exec/cpg.c:156
cs_error_t
Definition: corotypes.h:78
#define LOGSYS_LEVEL_DEBUG
Definition: logsys.h:74
mar_cpg_address_t member_list[PROCESSOR_COUNT_MAX]
Definition: ipc_cpg.h:272
mar_cpg_name_t group_name
Definition: exec/cpg.c:202
uint8_t mar_uint8_t
Definition: mar_gen.h:51
mar_cpg_name_t group_name
Definition: exec/cpg.c:155
typedef __attribute__
cpg_message_req_types
Definition: exec/cpg.c:80
LOGSYS_DECLARE_SUBSYS("CPG")
DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db, NULL)
#define swab32(x)
Definition: swab.h:43
enum cpd_state cpd_state
Definition: exec/cpg.c:157
uint32_t mar_uint32_t
Definition: mar_gen.h:53
unsigned int nodeid
Definition: exec/cpg.c:193
struct list_head list
Definition: exec/cpg.c:162
#define PROCESSOR_COUNT_MAX
Definition: coroapi.h:83
Definition: exec/cpg.c:200
qb_handle_t hdb_handle_t
Definition: hdb.h:52
struct corosync_service_engine cpg_service_engine
Definition: exec/cpg.c:436
struct list_head zcb_mapped_list_head
Definition: exec/cpg.c:164
const char *(* totem_ifaces_print)(unsigned int nodeid)
Definition: coroapi.h:243
uint32_t pid
Definition: exec/cpg.c:201
cpg_downlist_state_e
Definition: exec/cpg.c:144
#define list_entry(ptr, type, member)
Definition: list.h:84
void * addr
Definition: exec/cpg.c:92
mar_cpg_name_t group_name
Definition: exec/cpg.c:515
unsigned long long seq
Definition: coroapi.h:105
void(* lib_handler_fn)(void *conn, const void *msg)
Definition: coroapi.h:418
char type
Definition: totemrrp.c:518
int(* ipc_dispatch_send)(void *conn, const void *msg, size_t mlen)
Definition: coroapi.h:215
struct list_head items_list_head
Definition: exec/cpg.c:170
uint64_t server_addr
Definition: exec/cpg.c:1903
unsigned int nodeid
Definition: coroapi.h:65
struct memb_ring_id ring_id
Definition: totemsrp.c:64
void(* ipc_source_set)(mar_message_source_t *source, void *conn)
Definition: coroapi.h:204
cpd_state
Definition: exec/cpg.c:132
DECLARE_LIST_INIT(cpg_pd_list_head)
Message from another node.
Definition: ipc_cpg.h:239
void(* ipc_refcnt_inc)(void *conn)
Definition: coroapi.h:220