corosync
totemsrp.c
Go to the documentation of this file.
1/*
2 * Copyright (c) 2003-2006 MontaVista Software, Inc.
3 * Copyright (c) 2006-2018 Red Hat, Inc.
4 *
5 * All rights reserved.
6 *
7 * Author: Steven Dake (sdake@redhat.com)
8 *
9 * This software licensed under BSD license, the text of which follows:
10 *
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions are met:
13 *
14 * - Redistributions of source code must retain the above copyright notice,
15 * this list of conditions and the following disclaimer.
16 * - Redistributions in binary form must reproduce the above copyright notice,
17 * this list of conditions and the following disclaimer in the documentation
18 * and/or other materials provided with the distribution.
19 * - Neither the name of the MontaVista Software, Inc. nor the names of its
20 * contributors may be used to endorse or promote products derived from this
21 * software without specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33 * THE POSSIBILITY OF SUCH DAMAGE.
34 */
35
36/*
37 * The first version of this code was based upon Yair Amir's PhD thesis:
38 * https://corosync.github.io/corosync/doc/Yair_phd.ps.gz (ch4,5).
39 *
40 * The current version of totemsrp implements the Totem protocol specified in:
41 * https://corosync.github.io/corosync/doc/tocssrp95.ps.gz
42 *
43 * The deviations from the above published protocols are:
44 * - token hold mode where token doesn't rotate on unused ring - reduces cpu
45 * usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
46 */
47
48#include <config.h>
49
50#include <assert.h>
51#ifdef HAVE_ALLOCA_H
52#include <alloca.h>
53#endif
54#include <sys/mman.h>
55#include <sys/types.h>
56#include <sys/stat.h>
57#include <sys/socket.h>
58#include <netdb.h>
59#include <sys/un.h>
60#include <sys/ioctl.h>
61#include <sys/param.h>
62#include <netinet/in.h>
63#include <arpa/inet.h>
64#include <unistd.h>
65#include <fcntl.h>
66#include <stdlib.h>
67#include <stdio.h>
68#include <errno.h>
69#include <sched.h>
70#include <time.h>
71#include <sys/time.h>
72#include <sys/poll.h>
73#include <sys/uio.h>
74#include <limits.h>
75
76#include <qb/qblist.h>
77#include <qb/qbdefs.h>
78#include <qb/qbutil.h>
79#include <qb/qbloop.h>
80
81#include <corosync/swab.h>
82#include <corosync/sq.h>
83
84#define LOGSYS_UTILS_ONLY 1
85#include <corosync/logsys.h>
86
87#include "totemsrp.h"
88#include "totemnet.h"
89
90#include "icmap.h"
91#include "totemconfig.h"
92
93#include "cs_queue.h"
94
95#define LOCALHOST_IP inet_addr("127.0.0.1")
96#define QUEUE_RTR_ITEMS_SIZE_MAX 16384 /* allow 16384 retransmit items */
97#define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 /* allow 500 messages to be queued */
98#define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
99#define MAXIOVS 5
100#define RETRANSMIT_ENTRIES_MAX 30
101#define TOKEN_SIZE_MAX 64000 /* bytes */
102#define LEAVE_DUMMY_NODEID 0
103
104/*
105 * SRP address.
106 */
107struct srp_addr {
108 unsigned int nodeid;
109};
110
111/*
112 * Rollover handling:
113 * SEQNO_START_MSG is the starting sequence number after a new configuration
114 * This should remain zero, unless testing overflow in which case
115 * 0x7ffff000 and 0xfffff000 are good starting values.
116 *
117 * SEQNO_START_TOKEN is the starting sequence number after a new configuration
118 * for a token. This should remain zero, unless testing overflow in which
119 * case 07fffff00 or 0xffffff00 are good starting values.
120 */
121#define SEQNO_START_MSG 0x0
122#define SEQNO_START_TOKEN 0x0
123
124/*
125 * These can be used ot test different rollover points
126 * #define SEQNO_START_MSG 0xfffffe00
127 * #define SEQNO_START_TOKEN 0xfffffe00
128 */
129
130/*
131 * These can be used to test the error recovery algorithms
132 * #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30
133 * #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30
134 * #define TEST_DROP_MCAST_PERCENTAGE 50
135 * #define TEST_RECOVERY_MSG_COUNT 300
136 */
137
138/*
139 * we compare incoming messages to determine if their endian is
140 * different - if so convert them
141 *
142 * do not change
143 */
144#define ENDIAN_LOCAL 0xff22
145
147 MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
148 MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
149 MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
150 MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
151 MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
152 MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */
153};
154
159
160/*
161 * New membership algorithm local variables
162 */
165 int set;
166};
167
168
176
177
179 int mcast;
180 int token;
181};
182
192
193
194struct rtr_item {
196 unsigned int seq;
198
199
200struct orf_token {
202 unsigned int seq;
203 unsigned int token_seq;
204 unsigned int aru;
205 unsigned int aru_addr;
207 unsigned int backlog;
208 unsigned int fcc;
213
214
215struct memb_join {
218 unsigned int proc_list_entries;
220 unsigned long long ring_seq;
221 unsigned char end_of_memb_join[0];
222/*
223 * These parts of the data structure are dynamic:
224 * struct srp_addr proc_list[];
225 * struct srp_addr failed_list[];
226 */
228
229
235
236
241
242
249
250
253 unsigned int token_seq;
255 unsigned int retrans_flg;
258 unsigned char end_of_commit_token[0];
259/*
260 * These parts of the data structure are dynamic:
261 *
262 * struct srp_addr addr[PROCESSOR_COUNT_MAX];
263 * struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX];
264 */
266
268 struct mcast *mcast;
269 unsigned int msg_len;
271
273 struct mcast *mcast;
274 unsigned int msg_len;
275};
276
283
286
288
289 /*
290 * Flow control mcasts and remcasts on last and current orf_token
291 */
293
295
297
299
301
303
305
307
309
311
313
315
317
319
321
323
325
327
329
331
333
335
337
339
341
343
345
347
348 unsigned int my_last_aru;
349
351
353
355
356 unsigned int my_install_seq;
357
359
361
363
365
367
368 /*
369 * Queues used to order, deliver, and recover messages
370 */
372
374
376
378
380
381 /*
382 * Received up to and including
383 */
384 unsigned int my_aru;
385
386 unsigned int my_high_delivered;
387
389
391
393
395
396 unsigned int my_token_seq;
397
398 /*
399 * Timers
400 */
402
404
406
408
410
412
414
416
418
420
421 /*
422 * Function and data used to log messages
423 */
425
427
429
431
433
435
437
439 int level,
440 int subsys,
441 const char *function,
442 const char *file,
443 int line,
444 const char *format, ...)__attribute__((format(printf, 6, 7)));;
445
447
448//TODO struct srp_addr next_memb;
449
451
453
455 unsigned int nodeid,
456 const void *msg,
457 unsigned int msg_len,
459
462 const unsigned int *member_list, size_t member_list_entries,
463 const unsigned int *left_list, size_t left_list_entries,
464 const unsigned int *joined_list, size_t joined_list_entries,
465 const struct memb_ring_id *ring_id);
466
468
471
474 unsigned int nodeid);
475
477 const struct memb_ring_id *memb_ring_id,
478 unsigned int nodeid);
479
481
483
484 unsigned long long token_ring_id_seq;
485
486 unsigned int last_released;
487
488 unsigned int set_aru;
489
491
493
495
496 unsigned int my_last_seq;
497
499
501
503
504 unsigned int use_heartbeat;
505
506 unsigned int my_trc;
507
508 unsigned int my_pbl;
509
510 unsigned int my_cbl;
511
513
515
517
519
521
523
525
527
531};
532
534 int count;
536 struct totemsrp_instance *instance,
537 const void *msg,
538 size_t msg_len,
540};
541
561
562const char* gather_state_from_desc [] = {
563 [TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT] = "consensus timeout",
565 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE] = "The token was lost in the OPERATIONAL state.",
566 [TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED] = "The consensus timeout expired.",
567 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE] = "The token was lost in the COMMIT state.",
568 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE] = "The token was lost in the RECOVERY state.",
569 [TOTEMSRP_GSFROM_FAILED_TO_RECEIVE] = "failed to receive",
570 [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE] = "foreign message in operational state",
571 [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE] = "foreign message in gather state",
572 [TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE] = "merge during operational state",
573 [TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE] = "merge during gather state",
574 [TOTEMSRP_GSFROM_MERGE_DURING_JOIN] = "merge during join",
575 [TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE] = "join during operational state",
576 [TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE] = "join during commit state",
577 [TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY] = "join during recovery",
578 [TOTEMSRP_GSFROM_INTERFACE_CHANGE] = "interface change",
579};
580
581/*
582 * forward decls
583 */
584static int message_handler_orf_token (
585 struct totemsrp_instance *instance,
586 const void *msg,
587 size_t msg_len,
589
590static int message_handler_mcast (
591 struct totemsrp_instance *instance,
592 const void *msg,
593 size_t msg_len,
595
596static int message_handler_memb_merge_detect (
597 struct totemsrp_instance *instance,
598 const void *msg,
599 size_t msg_len,
601
602static int message_handler_memb_join (
603 struct totemsrp_instance *instance,
604 const void *msg,
605 size_t msg_len,
607
608static int message_handler_memb_commit_token (
609 struct totemsrp_instance *instance,
610 const void *msg,
611 size_t msg_len,
613
614static int message_handler_token_hold_cancel (
615 struct totemsrp_instance *instance,
616 const void *msg,
617 size_t msg_len,
619
620static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
621
622static void srp_addr_to_nodeid (
623 struct totemsrp_instance *instance,
624 unsigned int *nodeid_out,
625 struct srp_addr *srp_addr_in,
626 unsigned int entries);
627
628static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
629
630static void memb_leave_message_send (struct totemsrp_instance *instance);
631
632static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
633static void memb_state_gather_enter (struct totemsrp_instance *instance, enum gather_state_from gather_from);
634static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
635static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
637static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
638
639static void memb_ring_id_set (struct totemsrp_instance *instance,
640 const struct memb_ring_id *ring_id);
641static void target_set_completed (void *context);
642static void memb_state_commit_token_update (struct totemsrp_instance *instance);
643static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
644static int memb_state_commit_token_send (struct totemsrp_instance *instance);
645static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
646static void memb_state_commit_token_create (struct totemsrp_instance *instance);
647static int token_hold_cancel_send (struct totemsrp_instance *instance);
648static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
649static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
650static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
651static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
652static void memb_merge_detect_endian_convert (
653 const struct memb_merge_detect *in,
654 struct memb_merge_detect *out);
655static struct srp_addr srp_addr_endian_convert (struct srp_addr in);
656static void timer_function_orf_token_timeout (void *data);
657static void timer_function_orf_token_warning (void *data);
658static void timer_function_pause_timeout (void *data);
659static void timer_function_heartbeat_timeout (void *data);
660static void timer_function_token_retransmit_timeout (void *data);
661static void timer_function_token_hold_retransmit_timeout (void *data);
662static void timer_function_merge_detect_timeout (void *data);
663static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance);
664static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr);
665static const char* gsfrom_to_msg(enum gather_state_from gsfrom);
666
667int main_deliver_fn (
668 void *context,
669 const void *msg,
670 unsigned int msg_len,
671 const struct sockaddr_storage *system_from);
672
674 void *context,
675 const struct totem_ip_address *iface_address,
676 unsigned int iface_no);
677
679 6,
680 {
681 message_handler_orf_token, /* MESSAGE_TYPE_ORF_TOKEN */
682 message_handler_mcast, /* MESSAGE_TYPE_MCAST */
683 message_handler_memb_merge_detect, /* MESSAGE_TYPE_MEMB_MERGE_DETECT */
684 message_handler_memb_join, /* MESSAGE_TYPE_MEMB_JOIN */
685 message_handler_memb_commit_token, /* MESSAGE_TYPE_MEMB_COMMIT_TOKEN */
686 message_handler_token_hold_cancel /* MESSAGE_TYPE_TOKEN_HOLD_CANCEL */
687 }
688};
689
690#define log_printf(level, format, args...) \
691do { \
692 instance->totemsrp_log_printf ( \
693 level, instance->totemsrp_subsys_id, \
694 __FUNCTION__, __FILE__, __LINE__, \
695 format, ##args); \
696} while (0);
697#define LOGSYS_PERROR(err_num, level, fmt, args...) \
698do { \
699 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
700 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
701 instance->totemsrp_log_printf ( \
702 level, instance->totemsrp_subsys_id, \
703 __FUNCTION__, __FILE__, __LINE__, \
704 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
705 } while(0)
706
707static const char* gsfrom_to_msg(enum gather_state_from gsfrom)
708{
711 }
712 else {
713 return "UNKNOWN";
714 }
715}
716
717static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
718{
719 memset (instance, 0, sizeof (struct totemsrp_instance));
720
722
724
725 instance->my_received_flg = 1;
726
727 instance->my_token_seq = SEQNO_START_TOKEN - 1;
728
730
731 instance->set_aru = -1;
732
733 instance->my_aru = SEQNO_START_MSG;
734
736
738
739 instance->orf_token_discard = 0;
740
741 instance->originated_orf_token = 0;
742
743 instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
744
745 instance->waiting_trans_ack = 1;
746}
747
748static int pause_flush (struct totemsrp_instance *instance)
749{
752 int res = 0;
753
756
757 if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
759 "Process pause detected for %d ms, flushing membership messages.", (unsigned int)(now_msec - timestamp_msec));
760 /*
761 * -1 indicates an error from recvmsg
762 */
763 do {
765 } while (res == -1);
766 }
767 return (res);
768}
769
770static int token_event_stats_collector (enum totem_callback_token_type type, const void *void_instance)
771{
772 struct totemsrp_instance *instance = (struct totemsrp_instance *)void_instance;
774
776
778 /* incr latest token the index */
779 if (instance->stats.latest_token == (TOTEM_TOKEN_STATS_MAX - 1))
780 instance->stats.latest_token = 0;
781 else
782 instance->stats.latest_token++;
783
784 if (instance->stats.earliest_token == instance->stats.latest_token) {
785 /* we have filled up the array, start overwriting */
786 if (instance->stats.earliest_token == (TOTEM_TOKEN_STATS_MAX - 1))
787 instance->stats.earliest_token = 0;
788 else
789 instance->stats.earliest_token++;
790
791 instance->stats.token[instance->stats.earliest_token].rx = 0;
792 instance->stats.token[instance->stats.earliest_token].tx = 0;
793 instance->stats.token[instance->stats.earliest_token].backlog_calc = 0;
794 }
795
796 instance->stats.token[instance->stats.latest_token].rx = time_now;
797 instance->stats.token[instance->stats.latest_token].tx = 0; /* in case we drop the token */
798 } else {
799 instance->stats.token[instance->stats.latest_token].tx = time_now;
800 }
801 return 0;
802}
803
804static void totempg_mtu_changed(void *context, int net_mtu)
805{
806 struct totemsrp_instance *instance = context;
807
808 instance->totem_config->net_mtu = net_mtu - 2 * sizeof (struct mcast);
809
811 "Net MTU changed to %d, new value is %d",
812 net_mtu, instance->totem_config->net_mtu);
813}
814
815/*
816 * Exported interfaces
817 */
819 qb_loop_t *poll_handle,
820 void **srp_context,
822 totempg_stats_t *stats,
823
824 void (*deliver_fn) (
825 unsigned int nodeid,
826 const void *msg,
827 unsigned int msg_len,
829
830 void (*confchg_fn) (
832 const unsigned int *member_list, size_t member_list_entries,
833 const unsigned int *left_list, size_t left_list_entries,
834 const unsigned int *joined_list, size_t joined_list_entries,
835 const struct memb_ring_id *ring_id),
837 int waiting_trans_ack))
838{
839 struct totemsrp_instance *instance;
840 int res;
841
842 instance = malloc (sizeof (struct totemsrp_instance));
843 if (instance == NULL) {
844 goto error_exit;
845 }
846
847 totemsrp_instance_initialize (instance);
848
851
852 stats->srp = &instance->stats;
853 instance->stats.latest_token = 0;
854 instance->stats.earliest_token = 0;
855
856 instance->totem_config = totem_config;
857
858 /*
859 * Configure logging
860 */
869
870 /*
871 * Configure totem store and load functions
872 */
875
876 /*
877 * Initialize local variables for totemsrp
878 */
880
881 /*
882 * Display totem configuration
883 */
885 "Token Timeout (%d ms) retransmit timeout (%d ms)",
890 "Token warning every %d ms (%d%% of Token Timeout)",
892 if (token_warning_ms < totem_config->token_retransmit_timeout)
894 "The token warning interval (%d ms) is less than the token retransmit timeout (%d ms) "
895 "which can lead to spurious token warnings. Consider increasing the token_warning parameter.",
897 } else {
899 "Token warnings disabled");
900 }
902 "token hold (%d ms) retransmits before loss (%d retrans)",
905 "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
909
912 "downcheck (%d ms) fail to recv const (%d msgs)",
915 "seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu);
916
918 "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
920
922 "missed count const (%d messages)",
924
926 "send threads (%d threads)", totem_config->threads);
927
929 "heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed);
931 "max_network_delay (%d ms)", totem_config->max_network_delay);
932
933
934 cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
935 sizeof (struct message_item), instance->threaded_mode_enabled);
936
937 sq_init (&instance->regular_sort_queue,
939
940 sq_init (&instance->recovery_sort_queue,
942
943 instance->totemsrp_poll_handle = poll_handle;
944
945 instance->totemsrp_deliver_fn = deliver_fn;
946
947 instance->totemsrp_confchg_fn = confchg_fn;
948 instance->use_heartbeat = 1;
949
950 timer_function_pause_timeout (instance);
951
954 "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
955 instance->use_heartbeat = 0;
956 }
957
958 if (instance->use_heartbeat) {
959 instance->heartbeat_timeout
962
963 if (instance->heartbeat_timeout >= totem_config->token_timeout) {
965 "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
966 instance->heartbeat_timeout,
969 "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
971 "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
972 instance->use_heartbeat = 0;
973 }
974 else {
976 "total heartbeat_timeout (%d ms)", instance->heartbeat_timeout);
977 }
978 }
979
981 poll_handle,
982 &instance->totemnet_context,
984 stats->srp,
985 instance,
988 totempg_mtu_changed,
989 target_set_completed);
990 if (res == -1) {
991 goto error_exit;
992 }
993
994 instance->my_id.nodeid = instance->totem_config->interfaces[instance->lowest_active_if].boundto.nodeid;
995
996 /*
997 * Must have net_mtu adjusted by totemnet_initialize first
998 */
999 cs_queue_init (&instance->new_message_queue,
1001 sizeof (struct message_item), instance->threaded_mode_enabled);
1002
1003 cs_queue_init (&instance->new_message_queue_trans,
1005 sizeof (struct message_item), instance->threaded_mode_enabled);
1006
1008 &instance->token_recv_event_handle,
1010 0,
1011 token_event_stats_collector,
1012 instance);
1014 &instance->token_sent_event_handle,
1016 0,
1017 token_event_stats_collector,
1018 instance);
1019 *srp_context = instance;
1020 return (0);
1021
1023 return (-1);
1024}
1025
1027 void *srp_context)
1028{
1029 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1030
1031 memb_leave_message_send (instance);
1033 cs_queue_free (&instance->new_message_queue);
1034 cs_queue_free (&instance->new_message_queue_trans);
1035 cs_queue_free (&instance->retrans_message_queue);
1036 sq_free (&instance->regular_sort_queue);
1037 sq_free (&instance->recovery_sort_queue);
1038 free (instance);
1039}
1040
1042 void *srp_context,
1043 unsigned int nodeid,
1045{
1046 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1047 int i;
1048
1050
1051 /* Fill in 'reachable' here as the lower level UDP[u] layers don't know */
1052 for (i = 0; i < instance->my_proc_list_entries; i++) {
1053 if (instance->my_proc_list[i].nodeid == nodeid) {
1054 node_status->reachable = 1;
1055 }
1056 }
1057
1059}
1060
1061
1062/*
1063 * Return configured interfaces. interfaces is array of totem_ip addresses allocated by caller,
1064 * with interaces_size number of items. iface_count is final number of interfaces filled by this
1065 * function.
1066 *
1067 * Function returns 0 on success, otherwise if interfaces array is not big enough, -2 is returned,
1068 * and if interface was not found, -1 is returned.
1069 */
1071 void *srp_context,
1072 unsigned int nodeid,
1073 unsigned int *interface_id,
1074 struct totem_ip_address *interfaces,
1075 unsigned int interfaces_size,
1076 char ***status,
1077 unsigned int *iface_count)
1078{
1079 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1080 struct totem_ip_address *iface_ptr = interfaces;
1081 int res = 0;
1082 int i,n;
1083 int num_ifs = 0;
1084
1085 memset(interfaces, 0, sizeof(struct totem_ip_address) * interfaces_size);
1087
1088 for (i=0; i<INTERFACE_MAX; i++) {
1089 for (n=0; n < instance->totem_config->interfaces[i].member_count; n++) {
1090 if (instance->totem_config->interfaces[i].configured &&
1092 memcpy(iface_ptr, &instance->totem_config->interfaces[i].member_list[n], sizeof(struct totem_ip_address));
1094 iface_ptr++;
1095 if (++num_ifs > interfaces_size) {
1096 res = -2;
1097 break;
1098 }
1099 }
1100 }
1101 }
1102
1105 return (res);
1106}
1107
1109 void *srp_context,
1110 const char *cipher_type,
1111 const char *hash_type)
1112{
1113 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1114 int res;
1115
1117
1118 return (res);
1119}
1120
1121
1123 void *srp_context)
1124{
1125 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1126 unsigned int res;
1127
1128 res = instance->my_id.nodeid;
1129
1130 return (res);
1131}
1132
1134 void *srp_context)
1135{
1136 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1137 int res;
1138
1139 res = instance->totem_config->interfaces[instance->lowest_active_if].boundto.family;
1140
1141 return (res);
1142}
1143
1144
1145/*
1146 * Set operations for use by the membership algorithm
1147 */
1148static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
1149{
1150 if (a->nodeid == b->nodeid) {
1151 return 1;
1152 }
1153 return 0;
1154}
1155
1156static void srp_addr_to_nodeid (
1157 struct totemsrp_instance *instance,
1158 unsigned int *nodeid_out,
1159 struct srp_addr *srp_addr_in,
1160 unsigned int entries)
1161{
1162 unsigned int i;
1163
1164 for (i = 0; i < entries; i++) {
1165 nodeid_out[i] = srp_addr_in[i].nodeid;
1166 }
1167}
1168
1169static struct srp_addr srp_addr_endian_convert (struct srp_addr in)
1170{
1171 struct srp_addr res;
1172
1173 res.nodeid = swab32 (in.nodeid);
1174
1175 return (res);
1176}
1177
1178static void memb_consensus_reset (struct totemsrp_instance *instance)
1179{
1180 instance->consensus_list_entries = 0;
1181}
1182
1183static void memb_set_subtract (
1184 struct srp_addr *out_list, int *out_list_entries,
1185 struct srp_addr *one_list, int one_list_entries,
1186 struct srp_addr *two_list, int two_list_entries)
1187{
1188 int found = 0;
1189 int i;
1190 int j;
1191
1192 *out_list_entries = 0;
1193
1194 for (i = 0; i < one_list_entries; i++) {
1195 for (j = 0; j < two_list_entries; j++) {
1196 if (srp_addr_equal (&one_list[i], &two_list[j])) {
1197 found = 1;
1198 break;
1199 }
1200 }
1201 if (found == 0) {
1204 }
1205 found = 0;
1206 }
1207}
1208
1209/*
1210 * Set consensus for a specific processor
1211 */
1212static void memb_consensus_set (
1213 struct totemsrp_instance *instance,
1214 const struct srp_addr *addr)
1215{
1216 int found = 0;
1217 int i;
1218
1219 for (i = 0; i < instance->consensus_list_entries; i++) {
1220 if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
1221 found = 1;
1222 break; /* found entry */
1223 }
1224 }
1225 instance->consensus_list[i].addr = *addr;
1226 instance->consensus_list[i].set = 1;
1227 if (found == 0) {
1228 instance->consensus_list_entries++;
1229 }
1230 return;
1231}
1232
1233/*
1234 * Is consensus set for a specific processor
1235 */
1236static int memb_consensus_isset (
1237 struct totemsrp_instance *instance,
1238 const struct srp_addr *addr)
1239{
1240 int i;
1241
1242 for (i = 0; i < instance->consensus_list_entries; i++) {
1243 if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) {
1244 return (instance->consensus_list[i].set);
1245 }
1246 }
1247 return (0);
1248}
1249
1250/*
1251 * Is consensus agreed upon based upon consensus database
1252 */
1253static int memb_consensus_agreed (
1254 struct totemsrp_instance *instance)
1255{
1257 int token_memb_entries = 0;
1258 int agreed = 1;
1259 int i;
1260
1261 memb_set_subtract (token_memb, &token_memb_entries,
1262 instance->my_proc_list, instance->my_proc_list_entries,
1263 instance->my_failed_list, instance->my_failed_list_entries);
1264
1265 for (i = 0; i < token_memb_entries; i++) {
1266 if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1267 agreed = 0;
1268 break;
1269 }
1270 }
1271
1272 if (agreed && instance->failed_to_recv == 1) {
1273 /*
1274 * Both nodes agreed on our failure. We don't care how many proc list items left because we
1275 * will create single ring anyway.
1276 */
1277
1278 return (agreed);
1279 }
1280
1282
1283 return (agreed);
1284}
1285
1286static void memb_consensus_notset (
1287 struct totemsrp_instance *instance,
1290 struct srp_addr *comparison_list,
1292{
1293 int i;
1294
1296
1297 for (i = 0; i < instance->my_proc_list_entries; i++) {
1298 if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) {
1301 }
1302 }
1303}
1304
1305/*
1306 * Is set1 equal to set2 Entries can be in different orders
1307 */
1308static int memb_set_equal (
1309 struct srp_addr *set1, int set1_entries,
1310 struct srp_addr *set2, int set2_entries)
1311{
1312 int i;
1313 int j;
1314
1315 int found = 0;
1316
1317 if (set1_entries != set2_entries) {
1318 return (0);
1319 }
1320 for (i = 0; i < set2_entries; i++) {
1321 for (j = 0; j < set1_entries; j++) {
1322 if (srp_addr_equal (&set1[j], &set2[i])) {
1323 found = 1;
1324 break;
1325 }
1326 }
1327 if (found == 0) {
1328 return (0);
1329 }
1330 found = 0;
1331 }
1332 return (1);
1333}
1334
1335/*
1336 * Is subset fully contained in fullset
1337 */
1338static int memb_set_subset (
1339 const struct srp_addr *subset, int subset_entries,
1340 const struct srp_addr *fullset, int fullset_entries)
1341{
1342 int i;
1343 int j;
1344 int found = 0;
1345
1347 return (0);
1348 }
1349 for (i = 0; i < subset_entries; i++) {
1350 for (j = 0; j < fullset_entries; j++) {
1351 if (srp_addr_equal (&subset[i], &fullset[j])) {
1352 found = 1;
1353 }
1354 }
1355 if (found == 0) {
1356 return (0);
1357 }
1358 found = 0;
1359 }
1360 return (1);
1361}
1362/*
1363 * merge subset into fullset taking care not to add duplicates
1364 */
1365static void memb_set_merge (
1366 const struct srp_addr *subset, int subset_entries,
1367 struct srp_addr *fullset, int *fullset_entries)
1368{
1369 int found = 0;
1370 int i;
1371 int j;
1372
1373 for (i = 0; i < subset_entries; i++) {
1374 for (j = 0; j < *fullset_entries; j++) {
1375 if (srp_addr_equal (&fullset[j], &subset[i])) {
1376 found = 1;
1377 break;
1378 }
1379 }
1380 if (found == 0) {
1383 }
1384 found = 0;
1385 }
1386 return;
1387}
1388
1389static void memb_set_and_with_ring_id (
1390 struct srp_addr *set1,
1392 int set1_entries,
1393 struct srp_addr *set2,
1394 int set2_entries,
1395 struct memb_ring_id *old_ring_id,
1396 struct srp_addr *and,
1397 int *and_entries)
1398{
1399 int i;
1400 int j;
1401 int found = 0;
1402
1403 *and_entries = 0;
1404
1405 for (i = 0; i < set2_entries; i++) {
1406 for (j = 0; j < set1_entries; j++) {
1407 if (srp_addr_equal (&set1[j], &set2[i])) {
1408 if (memcmp (&set1_ring_ids[j], old_ring_id, sizeof (struct memb_ring_id)) == 0) {
1409 found = 1;
1410 }
1411 break;
1412 }
1413 }
1414 if (found) {
1415 and[*and_entries] = set1[j];
1416 *and_entries = *and_entries + 1;
1417 }
1418 found = 0;
1419 }
1420 return;
1421}
1422
1423static void memb_set_log(
1424 struct totemsrp_instance *instance,
1425 int level,
1426 const char *string,
1427 struct srp_addr *list,
1428 int list_entries)
1429{
1430 char int_buf[32];
1431 char list_str[512];
1432 int i;
1433
1434 memset(list_str, 0, sizeof(list_str));
1435
1436 for (i = 0; i < list_entries; i++) {
1437 if (i == 0) {
1438 snprintf(int_buf, sizeof(int_buf), CS_PRI_NODE_ID, list[i].nodeid);
1439 } else {
1440 snprintf(int_buf, sizeof(int_buf), "," CS_PRI_NODE_ID, list[i].nodeid);
1441 }
1442
1443 if (strlen(list_str) + strlen(int_buf) >= sizeof(list_str)) {
1444 break ;
1445 }
1447 }
1448
1449 log_printf(level, "List '%s' contains %d entries: %s", string, list_entries, list_str);
1450}
1451
1452static void my_leave_memb_clear(
1453 struct totemsrp_instance *instance)
1454{
1455 memset(instance->my_leave_memb_list, 0, sizeof(instance->my_leave_memb_list));
1456 instance->my_leave_memb_entries = 0;
1457}
1458
1459static unsigned int my_leave_memb_match(
1460 struct totemsrp_instance *instance,
1461 unsigned int nodeid)
1462{
1463 int i;
1464 unsigned int ret = 0;
1465
1466 for (i = 0; i < instance->my_leave_memb_entries; i++){
1467 if (instance->my_leave_memb_list[i] == nodeid){
1468 ret = nodeid;
1469 break;
1470 }
1471 }
1472 return ret;
1473}
1474
1475static void my_leave_memb_set(
1476 struct totemsrp_instance *instance,
1477 unsigned int nodeid)
1478{
1479 int i, found = 0;
1480 for (i = 0; i < instance->my_leave_memb_entries; i++){
1481 if (instance->my_leave_memb_list[i] == nodeid){
1482 found = 1;
1483 break;
1484 }
1485 }
1486 if (found == 1) {
1487 return;
1488 }
1489 if (instance->my_leave_memb_entries < (PROCESSOR_COUNT_MAX - 1)) {
1490 instance->my_leave_memb_list[instance->my_leave_memb_entries] = nodeid;
1491 instance->my_leave_memb_entries++;
1492 } else {
1494 "Cannot set LEAVE nodeid=" CS_PRI_NODE_ID, nodeid);
1495 }
1496}
1497
1498
1499static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance)
1500{
1501 assert (instance != NULL);
1502 return totemnet_buffer_alloc (instance->totemnet_context);
1503}
1504
1505static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr)
1506{
1507 assert (instance != NULL);
1509}
1510
1511static void reset_token_retransmit_timeout (struct totemsrp_instance *instance)
1512{
1513 int32_t res;
1514
1520 (void *)instance,
1521 timer_function_token_retransmit_timeout,
1522 &instance->timer_orf_token_retransmit_timeout);
1523 if (res != 0) {
1524 log_printf(instance->totemsrp_log_level_error, "reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res);
1525 }
1526
1527}
1528
1529static void start_merge_detect_timeout (struct totemsrp_instance *instance)
1530{
1531 int32_t res;
1532
1533 if (instance->my_merge_detect_timeout_outstanding == 0) {
1537 (void *)instance,
1538 timer_function_merge_detect_timeout,
1539 &instance->timer_merge_detect_timeout);
1540 if (res != 0) {
1541 log_printf(instance->totemsrp_log_level_error, "start_merge_detect_timeout - qb_loop_timer_add error : %d", res);
1542 }
1543
1545 }
1546}
1547
1548static void cancel_merge_detect_timeout (struct totemsrp_instance *instance)
1549{
1552}
1553
1554/*
1555 * ring_state_* is used to save and restore the sort queue
1556 * state when a recovery operation fails (and enters gather)
1557 */
1558static void old_ring_state_save (struct totemsrp_instance *instance)
1559{
1560 if (instance->old_ring_state_saved == 0) {
1561 instance->old_ring_state_saved = 1;
1562 memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
1563 sizeof (struct memb_ring_id));
1564 instance->old_ring_state_aru = instance->my_aru;
1567 "Saving state aru %x high seq received %x",
1568 instance->my_aru, instance->my_high_seq_received);
1569 }
1570}
1571
1572static void old_ring_state_restore (struct totemsrp_instance *instance)
1573{
1574 instance->my_aru = instance->old_ring_state_aru;
1577 "Restoring instance->my_aru %x my high seq received %x",
1578 instance->my_aru, instance->my_high_seq_received);
1579}
1580
1581static void old_ring_state_reset (struct totemsrp_instance *instance)
1582{
1584 "Resetting old ring state");
1585 instance->old_ring_state_saved = 0;
1586}
1587
1588static void reset_pause_timeout (struct totemsrp_instance *instance)
1589{
1590 int32_t res;
1591
1596 (void *)instance,
1597 timer_function_pause_timeout,
1598 &instance->timer_pause_timeout);
1599 if (res != 0) {
1600 log_printf(instance->totemsrp_log_level_error, "reset_pause_timeout - qb_loop_timer_add error : %d", res);
1601 }
1602}
1603
1604static void reset_token_warning (struct totemsrp_instance *instance) {
1605 int32_t res;
1606
1611 (void *)instance,
1612 timer_function_orf_token_warning,
1613 &instance->timer_orf_token_warning);
1614 if (res != 0) {
1615 log_printf(instance->totemsrp_log_level_error, "reset_token_warning - qb_loop_timer_add error : %d", res);
1616 }
1617}
1618
1619static void reset_token_timeout (struct totemsrp_instance *instance) {
1620 int32_t res;
1621
1626 (void *)instance,
1627 timer_function_orf_token_timeout,
1628 &instance->timer_orf_token_timeout);
1629 if (res != 0) {
1630 log_printf(instance->totemsrp_log_level_error, "reset_token_timeout - qb_loop_timer_add error : %d", res);
1631 }
1632
1633 if (instance->totem_config->token_warning)
1634 reset_token_warning(instance);
1635}
1636
1637static void reset_heartbeat_timeout (struct totemsrp_instance *instance) {
1638 int32_t res;
1639
1644 (void *)instance,
1645 timer_function_heartbeat_timeout,
1646 &instance->timer_heartbeat_timeout);
1647 if (res != 0) {
1648 log_printf(instance->totemsrp_log_level_error, "reset_heartbeat_timeout - qb_loop_timer_add error : %d", res);
1649 }
1650}
1651
1652
1653static void cancel_token_warning (struct totemsrp_instance *instance) {
1655}
1656
1657static void cancel_token_timeout (struct totemsrp_instance *instance) {
1659
1660 if (instance->totem_config->token_warning)
1661 cancel_token_warning(instance);
1662}
1663
1664static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) {
1666}
1667
1668static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance)
1669{
1671}
1672
1673static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1674{
1675 int32_t res;
1676
1680 (void *)instance,
1681 timer_function_token_hold_retransmit_timeout,
1682 &instance->timer_orf_token_hold_retransmit_timeout);
1683 if (res != 0) {
1684 log_printf(instance->totemsrp_log_level_error, "start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res);
1685 }
1686}
1687
1688static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1689{
1692}
1693
1694static void memb_state_consensus_timeout_expired (
1695 struct totemsrp_instance *instance)
1696{
1699
1700 instance->stats.consensus_timeouts++;
1701 if (memb_consensus_agreed (instance)) {
1702 memb_consensus_reset (instance);
1703
1704 memb_consensus_set (instance, &instance->my_id);
1705
1706 reset_token_timeout (instance); // REVIEWED
1707 } else {
1708 memb_consensus_notset (
1709 instance,
1712 instance->my_proc_list,
1713 instance->my_proc_list_entries);
1714
1716 instance->my_failed_list, &instance->my_failed_list_entries);
1717 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT);
1718 }
1719}
1720
1721static void memb_join_message_send (struct totemsrp_instance *instance);
1722
1723static void memb_merge_detect_transmit (struct totemsrp_instance *instance);
1724
1725/*
1726 * Timers used for various states of the membership algorithm
1727 */
1728static void timer_function_pause_timeout (void *data)
1729{
1730 struct totemsrp_instance *instance = data;
1731
1733 reset_pause_timeout (instance);
1734}
1735
1736static void memb_recovery_state_token_loss (struct totemsrp_instance *instance)
1737{
1738 old_ring_state_restore (instance);
1739 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE);
1740 instance->stats.recovery_token_lost++;
1741}
1742
1743static void timer_function_orf_token_warning (void *data)
1744{
1745 struct totemsrp_instance *instance = data;
1747
1748 /* need to protect against the case where token_warning is set to 0 dynamically */
1749 if (instance->totem_config->token_warning) {
1751 instance->stats.token[instance->stats.latest_token].rx;
1753 "Token has not been received in %"PRIu64" ms", tv_diff);
1754 reset_token_warning(instance);
1755 } else {
1756 cancel_token_warning(instance);
1757 }
1758}
1759
1760static void timer_function_orf_token_timeout (void *data)
1761{
1762 struct totemsrp_instance *instance = data;
1763
1764 switch (instance->memb_state) {
1767 "The token was lost in the OPERATIONAL state.");
1769 "A processor failed, forming new configuration:"
1770 " token timed out (%ums), waiting %ums for consensus.",
1771 instance->totem_config->token_timeout,
1772 instance->totem_config->consensus_timeout);
1774 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE);
1775 instance->stats.operational_token_lost++;
1776 break;
1777
1778 case MEMB_STATE_GATHER:
1780 "The consensus timeout expired (%ums).",
1781 instance->totem_config->consensus_timeout);
1782 memb_state_consensus_timeout_expired (instance);
1783 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED);
1784 instance->stats.gather_token_lost++;
1785 break;
1786
1787 case MEMB_STATE_COMMIT:
1789 "The token was lost in the COMMIT state.");
1790 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE);
1791 instance->stats.commit_token_lost++;
1792 break;
1793
1796 "The token was lost in the RECOVERY state.");
1797 memb_recovery_state_token_loss (instance);
1798 instance->orf_token_discard = 1;
1799 break;
1800 }
1801}
1802
1803static void timer_function_heartbeat_timeout (void *data)
1804{
1805 struct totemsrp_instance *instance = data;
1807 "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->memb_state);
1808 timer_function_orf_token_timeout(data);
1809}
1810
1811static void memb_timer_function_state_gather (void *data)
1812{
1813 struct totemsrp_instance *instance = data;
1814 int32_t res;
1815
1816 switch (instance->memb_state) {
1819 assert (0); /* this should never happen */
1820 break;
1821 case MEMB_STATE_GATHER:
1822 case MEMB_STATE_COMMIT:
1823 memb_join_message_send (instance);
1824
1825 /*
1826 * Restart the join timeout
1827 `*/
1829
1833 (void *)instance,
1834 memb_timer_function_state_gather,
1835 &instance->memb_timer_state_gather_join_timeout);
1836
1837 if (res != 0) {
1838 log_printf(instance->totemsrp_log_level_error, "memb_timer_function_state_gather - qb_loop_timer_add error : %d", res);
1839 }
1840 break;
1841 }
1842}
1843
1844static void memb_timer_function_gather_consensus_timeout (void *data)
1845{
1846 struct totemsrp_instance *instance = data;
1847 memb_state_consensus_timeout_expired (instance);
1848}
1849
1850static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
1851{
1852 unsigned int i;
1855 unsigned int range = 0;
1856 int res;
1857 void *ptr;
1858 struct mcast *mcast;
1859
1861 "recovery to regular %x-%x", SEQNO_START_MSG + 1, instance->my_aru);
1862
1863 range = instance->my_aru - SEQNO_START_MSG;
1864 /*
1865 * Move messages from recovery to regular sort queue
1866 */
1867// todo should i be initialized to 0 or 1 ?
1868 for (i = 1; i <= range; i++) {
1869 res = sq_item_get (&instance->recovery_sort_queue,
1870 i + SEQNO_START_MSG, &ptr);
1871 if (res != 0) {
1872 continue;
1873 }
1875
1876 /*
1877 * Convert recovery message into regular message
1878 */
1881 /*
1882 * Message is a recovery message encapsulated
1883 * in a new ring message
1884 */
1885 regular_message_item.mcast =
1886 (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
1887 regular_message_item.msg_len =
1888 recovery_message_item->msg_len - sizeof (struct mcast);
1890 } else {
1891 /*
1892 * TODO this case shouldn't happen
1893 */
1894 continue;
1895 }
1896
1898 "comparing if ring id is for this processors old ring seqno " CS_PRI_RING_ID_SEQ,
1899 (uint64_t)mcast->seq);
1900
1901 /*
1902 * Only add this message to the regular sort
1903 * queue if it was originated with the same ring
1904 * id as the previous ring
1905 */
1906 if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
1907 sizeof (struct memb_ring_id)) == 0) {
1908
1909 res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
1910 if (res == 0) {
1911 sq_item_add (&instance->regular_sort_queue,
1913 if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
1915 }
1916 }
1917 } else {
1919 "-not adding msg with seq no " CS_PRI_RING_ID_SEQ, (uint64_t)mcast->seq);
1920 }
1921 }
1922}
1923
1924/*
1925 * Change states in the state machine of the membership algorithm
1926 */
1927static void memb_state_operational_enter (struct totemsrp_instance *instance)
1928{
1930 int joined_list_entries = 0;
1931 unsigned int aru_save;
1935 unsigned int left_list[PROCESSOR_COUNT_MAX];
1936 unsigned int i;
1937 unsigned int res;
1938 char left_node_msg[1024];
1939 char joined_node_msg[1024];
1940 char failed_node_msg[1024];
1941
1942 instance->originated_orf_token = 0;
1943
1944 memb_consensus_reset (instance);
1945
1946 old_ring_state_reset (instance);
1947
1948 deliver_messages_from_recovery_to_regular (instance);
1949
1951 "Delivering to app %x to %x",
1952 instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
1953
1954 aru_save = instance->my_aru;
1955 instance->my_aru = instance->old_ring_state_aru;
1956
1957 messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received);
1958
1959 /*
1960 * Calculate joined and left list
1961 */
1962 memb_set_subtract (instance->my_left_memb_list,
1963 &instance->my_left_memb_entries,
1964 instance->my_memb_list, instance->my_memb_entries,
1965 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1966
1967 memb_set_subtract (joined_list, &joined_list_entries,
1968 instance->my_new_memb_list, instance->my_new_memb_entries,
1969 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1970
1971 /*
1972 * Install new membership
1973 */
1974 instance->my_memb_entries = instance->my_new_memb_entries;
1975 memcpy (&instance->my_memb_list, instance->my_new_memb_list,
1976 sizeof (struct srp_addr) * instance->my_memb_entries);
1977 instance->last_released = 0;
1978 instance->my_set_retrans_flg = 0;
1979
1980 /*
1981 * Deliver transitional configuration to application
1982 */
1983 srp_addr_to_nodeid (instance, left_list, instance->my_left_memb_list,
1984 instance->my_left_memb_entries);
1985 srp_addr_to_nodeid (instance, trans_memb_list_totemip,
1986 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1990 0, 0, &instance->my_ring_id);
1991 /*
1992 * Switch new totemsrp messages queue. Messages sent from now on are stored
1993 * in different queue so synchronization messages are delivered first. Totempg
1994 * buffers will be switched later.
1995 */
1996 instance->waiting_trans_ack = 1;
1997
1998// TODO we need to filter to ensure we only deliver those
1999// messages which are part of instance->my_deliver_memb
2000 messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
2001
2002 /*
2003 * Switch totempg buffers. This used to be right after
2004 * instance->waiting_trans_ack = 1;
2005 * line. This was causing problem, because there may be not yet
2006 * processed parts of messages in totempg buffers.
2007 * So when buffers were switched and recovered messages
2008 * got delivered it was not possible to assemble them.
2009 */
2011
2012 instance->my_aru = aru_save;
2013
2014 /*
2015 * Deliver regular configuration to application
2016 */
2017 srp_addr_to_nodeid (instance, new_memb_list_totemip,
2018 instance->my_new_memb_list, instance->my_new_memb_entries);
2019 srp_addr_to_nodeid (instance, joined_list_totemip, joined_list,
2023 0, 0,
2025
2026 /*
2027 * The recovery sort queue now becomes the regular
2028 * sort queue. It is necessary to copy the state
2029 * into the regular sort queue.
2030 */
2031 sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
2032 instance->my_last_aru = SEQNO_START_MSG;
2033
2034 /* When making my_proc_list smaller, ensure that the
2035 * now non-used entries are zero-ed out. There are some suspect
2036 * assert's that assume that there is always 2 entries in the list.
2037 * These fail when my_proc_list is reduced to 1 entry (and the
2038 * valid [0] entry is the same as the 'unused' [1] entry).
2039 */
2040 memset(instance->my_proc_list, 0,
2041 sizeof (struct srp_addr) * instance->my_proc_list_entries);
2042
2043 instance->my_proc_list_entries = instance->my_new_memb_entries;
2044 memcpy (instance->my_proc_list, instance->my_new_memb_list,
2045 sizeof (struct srp_addr) * instance->my_memb_entries);
2046
2047 instance->my_failed_list_entries = 0;
2048 /*
2049 * TODO Not exactly to spec
2050 *
2051 * At the entry to this function all messages without a gap are
2052 * deliered.
2053 *
2054 * This code throw away messages from the last gap in the sort queue
2055 * to my_high_seq_received
2056 *
2057 * What should really happen is we should deliver all messages up to
2058 * a gap, then delier the transitional configuration, then deliver
2059 * the messages between the first gap and my_high_seq_received, then
2060 * deliver a regular configuration, then deliver the regular
2061 * configuration
2062 *
2063 * Unfortunately totempg doesn't appear to like this operating mode
2064 * which needs more inspection
2065 */
2066 i = instance->my_high_seq_received + 1;
2067 do {
2068 void *ptr;
2069
2070 i -= 1;
2071 res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2072 if (i == 0) {
2073 break;
2074 }
2075 } while (res);
2076
2077 instance->my_high_delivered = i;
2078
2079 for (i = 0; i <= instance->my_high_delivered; i++) {
2080 void *ptr;
2081
2082 res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2083 if (res == 0) {
2085
2087 free (regular_message->mcast);
2088 }
2089 }
2090 sq_items_release (&instance->regular_sort_queue, instance->my_high_delivered);
2091 instance->last_released = instance->my_high_delivered;
2092
2093 if (joined_list_entries) {
2094 int sptr = 0;
2095 sptr += snprintf(joined_node_msg, sizeof(joined_node_msg)-sptr, " joined:");
2096 for (i=0; i< joined_list_entries; i++) {
2098 }
2099 }
2100 else {
2101 joined_node_msg[0] = '\0';
2102 }
2103
2104 if (instance->my_left_memb_entries) {
2105 int sptr = 0;
2106 int sptr2 = 0;
2107 sptr += snprintf(left_node_msg, sizeof(left_node_msg)-sptr, " left:");
2108 for (i=0; i< instance->my_left_memb_entries; i++) {
2110 }
2111 for (i=0; i< instance->my_left_memb_entries; i++) {
2112 if (my_leave_memb_match(instance, left_list[i]) == 0) {
2113 if (sptr2 == 0) {
2114 sptr2 += snprintf(failed_node_msg, sizeof(failed_node_msg)-sptr2, " failed:");
2115 }
2117 }
2118 }
2119 if (sptr2 == 0) {
2120 failed_node_msg[0] = '\0';
2121 }
2122 }
2123 else {
2124 left_node_msg[0] = '\0';
2125 failed_node_msg[0] = '\0';
2126 }
2127
2128 my_leave_memb_clear(instance);
2129
2131 "entering OPERATIONAL state.");
2133 "A new membership (" CS_PRI_RING_ID ") was formed. Members%s%s",
2134 instance->my_ring_id.rep,
2135 (uint64_t)instance->my_ring_id.seq,
2138
2139 if (strlen(failed_node_msg)) {
2141 "Failed to receive the leave message.%s",
2143 }
2144
2146
2147 instance->stats.operational_entered++;
2148 instance->stats.continuous_gather = 0;
2149
2150 instance->my_received_flg = 1;
2151
2152 reset_pause_timeout (instance);
2153
2154 /*
2155 * Save ring id information from this configuration to determine
2156 * which processors are transitioning from old regular configuration
2157 * in to new regular configuration on the next configuration change
2158 */
2159 memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
2160 sizeof (struct memb_ring_id));
2161
2162 return;
2163}
2164
2165static void memb_state_gather_enter (
2166 struct totemsrp_instance *instance,
2168{
2169 int32_t res;
2170
2171 instance->orf_token_discard = 1;
2172
2173 instance->originated_orf_token = 0;
2174
2175 memb_set_merge (
2176 &instance->my_id, 1,
2177 instance->my_proc_list, &instance->my_proc_list_entries);
2178
2179 memb_join_message_send (instance);
2180
2181 /*
2182 * Restart the join timeout
2183 */
2185
2189 (void *)instance,
2190 memb_timer_function_state_gather,
2191 &instance->memb_timer_state_gather_join_timeout);
2192 if (res != 0) {
2193 log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res);
2194 }
2195
2196 /*
2197 * Restart the consensus timeout
2198 */
2201
2205 (void *)instance,
2206 memb_timer_function_gather_consensus_timeout,
2207 &instance->memb_timer_state_gather_consensus_timeout);
2208 if (res != 0) {
2209 log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res);
2210 }
2211
2212 /*
2213 * Cancel the token loss and token retransmission timeouts
2214 */
2215 cancel_token_retransmit_timeout (instance); // REVIEWED
2216 cancel_token_timeout (instance); // REVIEWED
2217 cancel_merge_detect_timeout (instance);
2218
2219 memb_consensus_reset (instance);
2220
2221 memb_consensus_set (instance, &instance->my_id);
2222
2224 "entering GATHER state from %d(%s).",
2225 gather_from, gsfrom_to_msg(gather_from));
2226
2227 instance->memb_state = MEMB_STATE_GATHER;
2228 instance->stats.gather_entered++;
2229
2231 /*
2232 * State 3 means gather, so we are continuously gathering.
2233 */
2234 instance->stats.continuous_gather++;
2235 }
2236
2237 return;
2238}
2239
2240static void timer_function_token_retransmit_timeout (void *data);
2241
2242static void target_set_completed (
2243 void *context)
2244{
2245 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
2246
2247 memb_state_commit_token_send (instance);
2248
2249}
2250
2251static void memb_state_commit_enter (
2252 struct totemsrp_instance *instance)
2253{
2254 old_ring_state_save (instance);
2255
2256 memb_state_commit_token_update (instance);
2257
2258 memb_state_commit_token_target_set (instance);
2259
2261
2263
2265
2267
2268 memb_ring_id_set (instance, &instance->commit_token->ring_id);
2269
2270 instance->memb_ring_id_store (&instance->my_ring_id, instance->my_id.nodeid);
2271
2272 instance->token_ring_id_seq = instance->my_ring_id.seq;
2273
2275 "entering COMMIT state.");
2276
2277 instance->memb_state = MEMB_STATE_COMMIT;
2278 reset_token_retransmit_timeout (instance); // REVIEWED
2279 reset_token_timeout (instance); // REVIEWED
2280
2281 instance->stats.commit_entered++;
2282 instance->stats.continuous_gather = 0;
2283
2284 /*
2285 * reset all flow control variables since we are starting a new ring
2286 */
2287 instance->my_trc = 0;
2288 instance->my_pbl = 0;
2289 instance->my_cbl = 0;
2290 /*
2291 * commit token sent after callback that token target has been set
2292 */
2293}
2294
2295static void memb_state_recovery_enter (
2296 struct totemsrp_instance *instance,
2298{
2299 int i;
2300 int local_received_flg = 1;
2301 unsigned int low_ring_aru;
2302 unsigned int range = 0;
2303 unsigned int messages_originated = 0;
2304 const struct srp_addr *addr;
2307
2308 addr = (const struct srp_addr *)commit_token->end_of_commit_token;
2309 memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2310
2312 "entering RECOVERY state.");
2313
2314 instance->orf_token_discard = 0;
2315
2316 instance->my_high_ring_delivered = 0;
2317
2318 sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
2319 cs_queue_reinit (&instance->retrans_message_queue);
2320
2322
2323 memb_state_commit_token_send_recovery (instance, commit_token);
2324
2325 instance->my_token_seq = SEQNO_START_TOKEN - 1;
2326
2327 /*
2328 * Build regular configuration
2329 */
2331 instance->totemnet_context,
2332 commit_token->addr_entries);
2333
2334 /*
2335 * Build transitional configuration
2336 */
2337 for (i = 0; i < instance->my_new_memb_entries; i++) {
2340 sizeof (struct memb_ring_id));
2341 }
2342 memb_set_and_with_ring_id (
2343 instance->my_new_memb_list,
2345 instance->my_new_memb_entries,
2346 instance->my_memb_list,
2347 instance->my_memb_entries,
2348 &instance->my_old_ring_id,
2349 instance->my_trans_memb_list,
2350 &instance->my_trans_memb_entries);
2351
2352 for (i = 0; i < instance->my_trans_memb_entries; i++) {
2354 "TRANS [%d] member " CS_PRI_NODE_ID ":", i, instance->my_trans_memb_list[i].nodeid);
2355 }
2356 for (i = 0; i < instance->my_new_memb_entries; i++) {
2358 "position [%d] member " CS_PRI_NODE_ID ":", i, addr[i].nodeid);
2360 "previous ringid (" CS_PRI_RING_ID ")",
2361 memb_list[i].ring_id.rep, (uint64_t)memb_list[i].ring_id.seq);
2362
2364 "aru %x high delivered %x received flag %d",
2365 memb_list[i].aru,
2366 memb_list[i].high_delivered,
2367 memb_list[i].received_flg);
2368
2369 // assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
2370 }
2371 /*
2372 * Determine if any received flag is false
2373 */
2374 for (i = 0; i < commit_token->addr_entries; i++) {
2375 if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2376 instance->my_trans_memb_list, instance->my_trans_memb_entries) &&
2377
2378 memb_list[i].received_flg == 0) {
2379 instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
2380 memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
2381 sizeof (struct srp_addr) * instance->my_trans_memb_entries);
2383 break;
2384 }
2385 }
2386 if (local_received_flg == 1) {
2387 goto no_originate;
2388 } /* Else originate messages if we should */
2389
2390 /*
2391 * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
2392 */
2393 for (i = 0; i < commit_token->addr_entries; i++) {
2394 if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2395 instance->my_deliver_memb_list,
2396 instance->my_deliver_memb_entries) &&
2397
2398 memcmp (&instance->my_old_ring_id,
2399 &memb_list[i].ring_id,
2400 sizeof (struct memb_ring_id)) == 0) {
2401
2402 if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
2403
2404 low_ring_aru = memb_list[i].aru;
2405 }
2406 if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
2407 instance->my_high_ring_delivered = memb_list[i].high_delivered;
2408 }
2409 }
2410 }
2411
2412 /*
2413 * Copy all old ring messages to instance->retrans_message_queue
2414 */
2416 if (range == 0) {
2417 /*
2418 * No messages to copy
2419 */
2420 goto no_originate;
2421 }
2423
2425 "copying all old ring messages from %x-%x.",
2427
2428 for (i = 1; i <= range; i++) {
2431 void *ptr;
2432 int res;
2433
2434 res = sq_item_get (&instance->regular_sort_queue,
2435 low_ring_aru + i, &ptr);
2436 if (res != 0) {
2437 continue;
2438 }
2441 memset (&message_item, 0, sizeof (struct message_item));
2442 // TODO LEAK
2443 message_item.mcast = totemsrp_buffer_alloc (instance);
2445 memset(message_item.mcast, 0, sizeof (struct mcast));
2449 message_item.mcast->system_from = instance->my_id;
2451
2455 sizeof (struct memb_ring_id));
2456 message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
2457 memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
2460 cs_queue_item_add (&instance->retrans_message_queue, &message_item);
2461 }
2463 "Originated %d messages in RECOVERY.", messages_originated);
2464 goto originated;
2465
2468 "Did not need to originate any messages in recovery.");
2469
2471 instance->my_aru = SEQNO_START_MSG;
2472 instance->my_aru_count = 0;
2473 instance->my_seq_unchanged = 0;
2475 instance->my_install_seq = SEQNO_START_MSG;
2476 instance->last_released = SEQNO_START_MSG;
2477
2478 reset_token_timeout (instance); // REVIEWED
2479 reset_token_retransmit_timeout (instance); // REVIEWED
2480
2481 instance->memb_state = MEMB_STATE_RECOVERY;
2482 instance->stats.recovery_entered++;
2483 instance->stats.continuous_gather = 0;
2484
2485 return;
2486}
2487
2489{
2490 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2491
2492 token_hold_cancel_send (instance);
2493
2494 return;
2495}
2496
2498 void *srp_context,
2499 struct iovec *iovec,
2500 unsigned int iov_len,
2501 int guarantee)
2502{
2503 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2504 int i;
2506 char *addr;
2507 unsigned int addr_idx;
2508 struct cs_queue *queue_use;
2509
2510 if (instance->waiting_trans_ack) {
2512 } else {
2513 queue_use = &instance->new_message_queue;
2514 }
2515
2516 if (cs_queue_is_full (queue_use)) {
2517 log_printf (instance->totemsrp_log_level_debug, "queue full");
2518 return (-1);
2519 }
2520
2521 memset (&message_item, 0, sizeof (struct message_item));
2522
2523 /*
2524 * Allocate pending item
2525 */
2526 message_item.mcast = totemsrp_buffer_alloc (instance);
2527 if (message_item.mcast == 0) {
2528 goto error_mcast;
2529 }
2530
2531 /*
2532 * Set mcast header
2533 */
2534 memset(message_item.mcast, 0, sizeof (struct mcast));
2539
2542
2544 message_item.mcast->system_from = instance->my_id;
2545
2546 addr = (char *)message_item.mcast;
2547 addr_idx = sizeof (struct mcast);
2548 for (i = 0; i < iov_len; i++) {
2550 addr_idx += iovec[i].iov_len;
2551 }
2552
2554
2555 log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
2556 instance->stats.mcast_tx++;
2557 cs_queue_item_add (queue_use, &message_item);
2558
2559 return (0);
2560
2562 return (-1);
2563}
2564
2565/*
2566 * Determine if there is room to queue a new message
2567 */
2569{
2570 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2571 int avail;
2572 struct cs_queue *queue_use;
2573
2574 if (instance->waiting_trans_ack) {
2576 } else {
2577 queue_use = &instance->new_message_queue;
2578 }
2579 cs_queue_avail (queue_use, &avail);
2580
2581 return (avail);
2582}
2583
2584/*
2585 * ORF Token Management
2586 */
2587/*
2588 * Recast message to mcast group if it is available
2589 */
2590static int orf_token_remcast (
2591 struct totemsrp_instance *instance,
2592 int seq)
2593{
2595 int res;
2596 void *ptr;
2597
2598 struct sq *sort_queue;
2599
2600 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2601 sort_queue = &instance->recovery_sort_queue;
2602 } else {
2603 sort_queue = &instance->regular_sort_queue;
2604 }
2605
2606 res = sq_in_range (sort_queue, seq);
2607 if (res == 0) {
2608 log_printf (instance->totemsrp_log_level_debug, "sq not in range");
2609 return (-1);
2610 }
2611
2612 /*
2613 * Get RTR item at seq, if not available, return
2614 */
2615 res = sq_item_get (sort_queue, seq, &ptr);
2616 if (res != 0) {
2617 return -1;
2618 }
2619
2621
2623 instance->totemnet_context,
2626
2627 return (0);
2628}
2629
2630
2631/*
2632 * Free all freeable messages from ring
2633 */
2634static void messages_free (
2635 struct totemsrp_instance *instance,
2636 unsigned int token_aru)
2637{
2639 unsigned int i;
2640 int res;
2641 int log_release = 0;
2642 unsigned int release_to;
2643 unsigned int range = 0;
2644
2646 if (sq_lt_compare (instance->my_last_aru, release_to)) {
2647 release_to = instance->my_last_aru;
2648 }
2649 if (sq_lt_compare (instance->my_high_delivered, release_to)) {
2650 release_to = instance->my_high_delivered;
2651 }
2652
2653 /*
2654 * Ensure we dont try release before an already released point
2655 */
2656 if (sq_lt_compare (release_to, instance->last_released)) {
2657 return;
2658 }
2659
2660 range = release_to - instance->last_released;
2662
2663 /*
2664 * Release retransmit list items if group aru indicates they are transmitted
2665 */
2666 for (i = 1; i <= range; i++) {
2667 void *ptr;
2668
2669 res = sq_item_get (&instance->regular_sort_queue,
2670 instance->last_released + i, &ptr);
2671 if (res == 0) {
2673 totemsrp_buffer_release (instance, regular_message->mcast);
2674 }
2675 sq_items_release (&instance->regular_sort_queue,
2676 instance->last_released + i);
2677
2678 log_release = 1;
2679 }
2680 instance->last_released += range;
2681
2682 if (log_release) {
2684 "releasing messages up to and including %x", release_to);
2685 }
2686}
2687
2688static void update_aru (
2689 struct totemsrp_instance *instance)
2690{
2691 unsigned int i;
2692 int res;
2693 struct sq *sort_queue;
2694 unsigned int range;
2695 unsigned int my_aru_saved = 0;
2696
2697 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2698 sort_queue = &instance->recovery_sort_queue;
2699 } else {
2700 sort_queue = &instance->regular_sort_queue;
2701 }
2702
2703 range = instance->my_high_seq_received - instance->my_aru;
2704
2705 my_aru_saved = instance->my_aru;
2706 for (i = 1; i <= range; i++) {
2707
2708 void *ptr;
2709
2710 res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2711 /*
2712 * If hole, stop updating aru
2713 */
2714 if (res != 0) {
2715 break;
2716 }
2717 }
2718 instance->my_aru += i - 1;
2719}
2720
2721/*
2722 * Multicasts pending messages onto the ring (requires orf_token possession)
2723 */
2724static int orf_token_mcast (
2725 struct totemsrp_instance *instance,
2726 struct orf_token *token,
2728{
2729 struct message_item *message_item = 0;
2730 struct cs_queue *mcast_queue;
2731 struct sq *sort_queue;
2733 struct mcast *mcast;
2734 unsigned int fcc_mcast_current;
2735
2736 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2738 sort_queue = &instance->recovery_sort_queue;
2739 reset_token_retransmit_timeout (instance); // REVIEWED
2740 } else {
2741 if (instance->waiting_trans_ack) {
2743 } else {
2744 mcast_queue = &instance->new_message_queue;
2745 }
2746
2747 sort_queue = &instance->regular_sort_queue;
2748 }
2749
2751 if (cs_queue_is_empty (mcast_queue)) {
2752 break;
2753 }
2754 message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
2755
2756 message_item->mcast->seq = ++token->seq;
2757 message_item->mcast->this_seqno = instance->global_seqno++;
2758
2759 /*
2760 * Build IO vector
2761 */
2762 memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2765
2767
2768 memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2769
2770 /*
2771 * Add message to retransmit queue
2772 */
2773 sq_item_add (sort_queue, &sort_queue_item, message_item->mcast->seq);
2774
2776 instance->totemnet_context,
2779
2780 /*
2781 * Delete item from pending queue
2782 */
2783 cs_queue_item_remove (mcast_queue);
2784
2785 /*
2786 * If messages mcasted, deliver any new messages to totempg
2787 */
2788 instance->my_high_seq_received = token->seq;
2789 }
2790
2791 update_aru (instance);
2792
2793 /*
2794 * Return 1 if more messages are available for single node clusters
2795 */
2796 return (fcc_mcast_current);
2797}
2798
2799/*
2800 * Remulticasts messages in orf_token's retransmit list (requires orf_token)
2801 * Modify's orf_token's rtr to include retransmits required by this process
2802 */
2803static int orf_token_rtr (
2804 struct totemsrp_instance *instance,
2805 struct orf_token *orf_token,
2806 unsigned int *fcc_allowed)
2807{
2808 unsigned int res;
2809 unsigned int i, j;
2810 unsigned int found;
2811 struct sq *sort_queue;
2812 struct rtr_item *rtr_list;
2813 unsigned int range = 0;
2814 char retransmit_msg[1024];
2815 char value[64];
2816
2817 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2818 sort_queue = &instance->recovery_sort_queue;
2819 } else {
2820 sort_queue = &instance->regular_sort_queue;
2821 }
2822
2824
2825 strcpy (retransmit_msg, "Retransmit List: ");
2828 "Retransmit List %d", orf_token->rtr_list_entries);
2829 for (i = 0; i < orf_token->rtr_list_entries; i++) {
2830 sprintf (value, "%x ", rtr_list[i].seq);
2832 }
2833 strcat (retransmit_msg, "");
2835 "%s", retransmit_msg);
2836 }
2837
2838 /*
2839 * Retransmit messages on orf_token's RTR list from RTR queue
2840 */
2841 for (instance->fcc_remcast_current = 0, i = 0;
2843
2844 /*
2845 * If this retransmit request isn't from this configuration,
2846 * try next rtr entry
2847 */
2848 if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id,
2849 sizeof (struct memb_ring_id)) != 0) {
2850
2851 i += 1;
2852 continue;
2853 }
2854
2855 res = orf_token_remcast (instance, rtr_list[i].seq);
2856 if (res == 0) {
2857 /*
2858 * Multicasted message, so no need to copy to new retransmit list
2859 */
2862 memmove (&rtr_list[i], &rtr_list[i + 1],
2863 sizeof (struct rtr_item) * (orf_token->rtr_list_entries - i));
2864
2865 instance->stats.mcast_retx++;
2866 instance->fcc_remcast_current++;
2867 } else {
2868 i += 1;
2869 }
2870 }
2872
2873 /*
2874 * Add messages to retransmit to RTR list
2875 * but only retry if there is room in the retransmit list
2876 */
2877
2878 range = orf_token->seq - instance->my_aru;
2880
2882 (i <= range); i++) {
2883
2884 /*
2885 * Ensure message is within the sort queue range
2886 */
2887 res = sq_in_range (sort_queue, instance->my_aru + i);
2888 if (res == 0) {
2889 break;
2890 }
2891
2892 /*
2893 * Find if a message is missing from this processor
2894 */
2895 res = sq_item_inuse (sort_queue, instance->my_aru + i);
2896 if (res == 0) {
2897 /*
2898 * Determine how many times we have missed receiving
2899 * this sequence number. sq_item_miss_count increments
2900 * a counter for the sequence number. The miss count
2901 * will be returned and compared. This allows time for
2902 * delayed multicast messages to be received before
2903 * declaring the message is missing and requesting a
2904 * retransmit.
2905 */
2906 res = sq_item_miss_count (sort_queue, instance->my_aru + i);
2908 continue;
2909 }
2910
2911 /*
2912 * Determine if missing message is already in retransmit list
2913 */
2914 found = 0;
2915 for (j = 0; j < orf_token->rtr_list_entries; j++) {
2916 if (instance->my_aru + i == rtr_list[j].seq) {
2917 found = 1;
2918 }
2919 }
2920 if (found == 0) {
2921 /*
2922 * Missing message not found in current retransmit list so add it
2923 */
2925 &instance->my_ring_id, sizeof (struct memb_ring_id));
2928 }
2929 }
2930 }
2931 return (instance->fcc_remcast_current);
2932}
2933
2934static void token_retransmit (struct totemsrp_instance *instance)
2935{
2936 instance->stats.orf_token_tx++;
2938 instance->orf_token_retransmit,
2939 instance->orf_token_retransmit_size);
2940}
2941
2942/*
2943 * Retransmit the regular token if no mcast or token has
2944 * been received in retransmit token period retransmit
2945 * the token to the next processor
2946 */
2947static void timer_function_token_retransmit_timeout (void *data)
2948{
2949 struct totemsrp_instance *instance = data;
2950
2951 switch (instance->memb_state) {
2952 case MEMB_STATE_GATHER:
2953 break;
2954 case MEMB_STATE_COMMIT:
2957 token_retransmit (instance);
2958 reset_token_retransmit_timeout (instance); // REVIEWED
2959 break;
2960 }
2961}
2962
2963static void timer_function_token_hold_retransmit_timeout (void *data)
2964{
2965 struct totemsrp_instance *instance = data;
2966
2967 switch (instance->memb_state) {
2968 case MEMB_STATE_GATHER:
2969 break;
2970 case MEMB_STATE_COMMIT:
2971 break;
2974 token_retransmit (instance);
2975 break;
2976 }
2977}
2978
2979static void timer_function_merge_detect_timeout(void *data)
2980{
2981 struct totemsrp_instance *instance = data;
2982
2984
2985 switch (instance->memb_state) {
2987 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
2988 memb_merge_detect_transmit (instance);
2989 }
2990 break;
2991 case MEMB_STATE_GATHER:
2992 case MEMB_STATE_COMMIT:
2994 break;
2995 }
2996}
2997
2998/*
2999 * Send orf_token to next member (requires orf_token)
3000 */
3001static int token_send (
3002 struct totemsrp_instance *instance,
3003 struct orf_token *orf_token,
3004 int forward_token)
3005{
3006 int res = 0;
3007 unsigned int orf_token_size;
3008
3009 orf_token_size = sizeof (struct orf_token) +
3010 (orf_token->rtr_list_entries * sizeof (struct rtr_item));
3011
3012 orf_token->header.nodeid = instance->my_id.nodeid;
3016
3017 if (forward_token == 0) {
3018 return (0);
3019 }
3020
3021 instance->stats.orf_token_tx++;
3023 orf_token,
3025
3026 return (res);
3027}
3028
3029static int token_hold_cancel_send (struct totemsrp_instance *instance)
3030{
3032
3033 /*
3034 * Only cancel if the token is currently held
3035 */
3036 if (instance->my_token_held == 0) {
3037 return (0);
3038 }
3039 instance->my_token_held = 0;
3040
3041 /*
3042 * Build message
3043 */
3050 sizeof (struct memb_ring_id));
3052
3053 instance->stats.token_hold_cancel_tx++;
3054
3056 sizeof (struct token_hold_cancel));
3057
3058 return (0);
3059}
3060
3061static int orf_token_send_initial (struct totemsrp_instance *instance)
3062{
3063 struct orf_token orf_token;
3064 int res;
3065
3070 orf_token.header.nodeid = instance->my_id.nodeid;
3075 instance->my_set_retrans_flg = 1;
3076
3077 if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
3079 instance->my_set_retrans_flg = 0;
3080 } else {
3082 instance->my_set_retrans_flg = 1;
3083 }
3084
3085 orf_token.aru = 0;
3087 orf_token.aru_addr = instance->my_id.nodeid;
3088
3089 memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
3090 orf_token.fcc = 0;
3091 orf_token.backlog = 0;
3092
3094
3095 res = token_send (instance, &orf_token, 1);
3096
3097 return (res);
3098}
3099
3100static void memb_state_commit_token_update (
3101 struct totemsrp_instance *instance)
3102{
3103 struct srp_addr *addr;
3105 unsigned int high_aru;
3106 unsigned int i;
3107
3108 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3110
3111 memcpy (instance->my_new_memb_list, addr,
3112 sizeof (struct srp_addr) * instance->commit_token->addr_entries);
3113
3114 instance->my_new_memb_entries = instance->commit_token->addr_entries;
3115
3116 memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
3117 &instance->my_old_ring_id, sizeof (struct memb_ring_id));
3118
3119 memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
3120 /*
3121 * TODO high delivered is really instance->my_aru, but with safe this
3122 * could change?
3123 */
3124 instance->my_received_flg =
3125 (instance->my_aru == instance->my_high_seq_received);
3126
3127 memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
3128
3129 memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
3130 /*
3131 * find high aru up to current memb_index for all matching ring ids
3132 * if any ring id matching memb_index has aru less then high aru set
3133 * received flag for that entry to false
3134 */
3135 high_aru = memb_list[instance->commit_token->memb_index].aru;
3136 for (i = 0; i <= instance->commit_token->memb_index; i++) {
3137 if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3138 &memb_list[i].ring_id,
3139 sizeof (struct memb_ring_id)) == 0) {
3140
3141 if (sq_lt_compare (high_aru, memb_list[i].aru)) {
3142 high_aru = memb_list[i].aru;
3143 }
3144 }
3145 }
3146
3147 for (i = 0; i <= instance->commit_token->memb_index; i++) {
3148 if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3149 &memb_list[i].ring_id,
3150 sizeof (struct memb_ring_id)) == 0) {
3151
3152 if (sq_lt_compare (memb_list[i].aru, high_aru)) {
3153 memb_list[i].received_flg = 0;
3154 if (i == instance->commit_token->memb_index) {
3155 instance->my_received_flg = 0;
3156 }
3157 }
3158 }
3159 }
3160
3161 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3162 instance->commit_token->memb_index += 1;
3163 assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
3164 assert (instance->commit_token->header.nodeid);
3165}
3166
3167static void memb_state_commit_token_target_set (
3168 struct totemsrp_instance *instance)
3169{
3170 struct srp_addr *addr;
3171
3172 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3173
3174 /* Totemnet just looks at the node id */
3176 instance->totemnet_context,
3177 addr[instance->commit_token->memb_index %
3178 instance->commit_token->addr_entries].nodeid);
3179}
3180
3181static int memb_state_commit_token_send_recovery (
3182 struct totemsrp_instance *instance,
3183 struct memb_commit_token *commit_token)
3184{
3185 unsigned int commit_token_size;
3186
3187 commit_token->token_seq++;
3188 commit_token->header.nodeid = instance->my_id.nodeid;
3189 commit_token_size = sizeof (struct memb_commit_token) +
3190 ((sizeof (struct srp_addr) +
3192 /*
3193 * Make a copy for retransmission if necessary
3194 */
3195 memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
3197
3198 instance->stats.memb_commit_token_tx++;
3199
3201 commit_token,
3203
3204 /*
3205 * Request retransmission of the commit token in case it is lost
3206 */
3207 reset_token_retransmit_timeout (instance);
3208 return (0);
3209}
3210
3211static int memb_state_commit_token_send (
3212 struct totemsrp_instance *instance)
3213{
3214 unsigned int commit_token_size;
3215
3216 instance->commit_token->token_seq++;
3217 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3218 commit_token_size = sizeof (struct memb_commit_token) +
3219 ((sizeof (struct srp_addr) +
3221 /*
3222 * Make a copy for retransmission if necessary
3223 */
3226
3227 instance->stats.memb_commit_token_tx++;
3228
3230 instance->commit_token,
3232
3233 /*
3234 * Request retransmission of the commit token in case it is lost
3235 */
3236 reset_token_retransmit_timeout (instance);
3237 return (0);
3238}
3239
3240
3241static int memb_lowest_in_config (struct totemsrp_instance *instance)
3242{
3244 int token_memb_entries = 0;
3245 int i;
3246 unsigned int lowest_nodeid;
3247
3248 memb_set_subtract (token_memb, &token_memb_entries,
3249 instance->my_proc_list, instance->my_proc_list_entries,
3250 instance->my_failed_list, instance->my_failed_list_entries);
3251
3252 /*
3253 * find representative by searching for smallest identifier
3254 */
3256
3257 lowest_nodeid = token_memb[0].nodeid;
3258 for (i = 1; i < token_memb_entries; i++) {
3260 lowest_nodeid = token_memb[i].nodeid;
3261 }
3262 }
3263 return (lowest_nodeid == instance->my_id.nodeid);
3264}
3265
3266static int srp_addr_compare (const void *a, const void *b)
3267{
3268 const struct srp_addr *srp_a = (const struct srp_addr *)a;
3269 const struct srp_addr *srp_b = (const struct srp_addr *)b;
3270
3271 if (srp_a->nodeid < srp_b->nodeid) {
3272 return -1;
3273 } else if (srp_a->nodeid > srp_b->nodeid) {
3274 return 1;
3275 } else {
3276 return 0;
3277 }
3278}
3279
3280static void memb_state_commit_token_create (
3281 struct totemsrp_instance *instance)
3282{
3284 struct srp_addr *addr;
3286 int token_memb_entries = 0;
3287
3289 "Creating commit token because I am the rep.");
3290
3291 memb_set_subtract (token_memb, &token_memb_entries,
3292 instance->my_proc_list, instance->my_proc_list_entries,
3293 instance->my_failed_list, instance->my_failed_list_entries);
3294
3295 memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
3299 instance->commit_token->header.encapsulated = 0;
3300 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3301 assert (instance->commit_token->header.nodeid);
3302
3303 instance->commit_token->ring_id.rep = instance->my_id.nodeid;
3304 instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
3305
3306 /*
3307 * This qsort is necessary to ensure the commit token traverses
3308 * the ring in the proper order
3309 */
3310 qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
3311 srp_addr_compare);
3312
3313 instance->commit_token->memb_index = 0;
3315
3316 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3318
3320 token_memb_entries * sizeof (struct srp_addr));
3321 memset (memb_list, 0,
3323}
3324
3325static void memb_join_message_send (struct totemsrp_instance *instance)
3326{
3327 char memb_join_data[40000];
3328 struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3329 char *addr;
3330 unsigned int addr_idx;
3331 size_t msg_len;
3332
3337 memb_join->header.nodeid = instance->my_id.nodeid;
3339
3340 msg_len = sizeof(struct memb_join) +
3341 ((instance->my_proc_list_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3342
3343 if (msg_len > sizeof(memb_join_data)) {
3345 "memb_join_message too long. Ignoring message.");
3346
3347 return ;
3348 }
3349
3350 memb_join->ring_seq = instance->my_ring_id.seq;
3353 memb_join->system_from = instance->my_id;
3354
3355 /*
3356 * This mess adds the joined and failed processor lists into the join
3357 * message
3358 */
3359 addr = (char *)memb_join;
3360 addr_idx = sizeof (struct memb_join);
3361 memcpy (&addr[addr_idx],
3362 instance->my_proc_list,
3363 instance->my_proc_list_entries *
3364 sizeof (struct srp_addr));
3365 addr_idx +=
3366 instance->my_proc_list_entries *
3367 sizeof (struct srp_addr);
3368 memcpy (&addr[addr_idx],
3369 instance->my_failed_list,
3370 instance->my_failed_list_entries *
3371 sizeof (struct srp_addr));
3372 addr_idx +=
3373 instance->my_failed_list_entries *
3374 sizeof (struct srp_addr);
3375
3376 if (instance->totem_config->send_join_timeout) {
3377 // coverity[DC.WEAK_CRYPTO:SUPPRESS] random is not used in a security context
3378 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3379 }
3380
3381 instance->stats.memb_join_tx++;
3382
3384 instance->totemnet_context,
3385 memb_join,
3386 addr_idx);
3387}
3388
3389static void memb_leave_message_send (struct totemsrp_instance *instance)
3390{
3391 char memb_join_data[40000];
3392 struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3393 char *addr;
3394 unsigned int addr_idx;
3397 size_t msg_len;
3398
3400 "sending join/leave message");
3401
3402 /*
3403 * add us to the failed list, and remove us from
3404 * the members list
3405 */
3406 memb_set_merge(
3407 &instance->my_id, 1,
3408 instance->my_failed_list, &instance->my_failed_list_entries);
3409
3410 memb_set_subtract (active_memb, &active_memb_entries,
3411 instance->my_proc_list, instance->my_proc_list_entries,
3412 &instance->my_id, 1);
3413
3414 msg_len = sizeof(struct memb_join) +
3415 ((active_memb_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3416
3417 if (msg_len > sizeof(memb_join_data)) {
3419 "memb_leave message too long. Ignoring message.");
3420
3421 return ;
3422 }
3423
3429
3430 memb_join->ring_seq = instance->my_ring_id.seq;
3433 memb_join->system_from = instance->my_id;
3434
3435 // TODO: CC Maybe use the actual join send routine.
3436 /*
3437 * This mess adds the joined and failed processor lists into the join
3438 * message
3439 */
3440 addr = (char *)memb_join;
3441 addr_idx = sizeof (struct memb_join);
3442 memcpy (&addr[addr_idx],
3445 sizeof (struct srp_addr));
3446 addr_idx +=
3448 sizeof (struct srp_addr);
3449 memcpy (&addr[addr_idx],
3450 instance->my_failed_list,
3451 instance->my_failed_list_entries *
3452 sizeof (struct srp_addr));
3453 addr_idx +=
3454 instance->my_failed_list_entries *
3455 sizeof (struct srp_addr);
3456
3457
3458 if (instance->totem_config->send_join_timeout) {
3459 // coverity[DC.WEAK_CRYPTO:SUPPRESS] random is not used in a security context
3460 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3461 }
3462 instance->stats.memb_join_tx++;
3463
3465 instance->totemnet_context,
3466 memb_join,
3467 addr_idx);
3468}
3469
3470static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
3471{
3473
3481 sizeof (struct memb_ring_id));
3483
3484 instance->stats.memb_merge_detect_tx++;
3487 sizeof (struct memb_merge_detect));
3488}
3489
3490static void memb_ring_id_set (
3491 struct totemsrp_instance *instance,
3492 const struct memb_ring_id *ring_id)
3493{
3494
3495 memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
3496}
3497
3499 void *srp_context,
3500 void **handle_out,
3502 int delete,
3503 int (*callback_fn) (enum totem_callback_token_type type, const void *),
3504 const void *data)
3505{
3506 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
3507 struct token_callback_instance *callback_handle;
3508
3509 token_hold_cancel_send (instance);
3510
3511 callback_handle = malloc (sizeof (struct token_callback_instance));
3512 if (callback_handle == 0) {
3513 return (-1);
3514 }
3515 *handle_out = (void *)callback_handle;
3516 qb_list_init (&callback_handle->list);
3517 callback_handle->callback_fn = callback_fn;
3518 callback_handle->data = (void *) data;
3519 callback_handle->callback_type = type;
3520 callback_handle->delete = delete;
3521 switch (type) {
3523 qb_list_add (&callback_handle->list, &instance->token_callback_received_listhead);
3524 break;
3526 qb_list_add (&callback_handle->list, &instance->token_callback_sent_listhead);
3527 break;
3528 }
3529
3530 return (0);
3531}
3532
3534{
3535 struct token_callback_instance *h;
3536
3537 if (*handle_out) {
3539 qb_list_del (&h->list);
3540 free (h);
3541 h = NULL;
3542 *handle_out = 0;
3543 }
3544}
3545
3546static void token_callbacks_execute (
3547 struct totemsrp_instance *instance,
3549{
3550 struct qb_list_head *list, *tmp_iter;
3551 struct qb_list_head *callback_listhead = 0;
3553 int res;
3554 int del;
3555
3556 switch (type) {
3559 break;
3562 break;
3563 default:
3564 assert (0);
3565 }
3566
3570 if (del == 1) {
3571 qb_list_del (list);
3572 }
3573
3577 /*
3578 * This callback failed to execute, try it again on the next token
3579 */
3580 if (res == -1 && del == 1) {
3582 } else if (del) {
3584 }
3585 }
3586}
3587
3588/*
3589 * Flow control functions
3590 */
3591static unsigned int backlog_get (struct totemsrp_instance *instance)
3592{
3593 unsigned int backlog = 0;
3594 struct cs_queue *queue_use = NULL;
3595
3596 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3597 if (instance->waiting_trans_ack) {
3599 } else {
3600 queue_use = &instance->new_message_queue;
3601 }
3602 } else
3603 if (instance->memb_state == MEMB_STATE_RECOVERY) {
3604 queue_use = &instance->retrans_message_queue;
3605 }
3606
3607 if (queue_use != NULL) {
3608 backlog = cs_queue_used (queue_use);
3609 }
3610
3611 instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
3612 return (backlog);
3613}
3614
3615static int fcc_calculate (
3616 struct totemsrp_instance *instance,
3617 struct orf_token *token)
3618{
3619 unsigned int transmits_allowed;
3620 unsigned int backlog_calc;
3621
3623
3624 if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
3625 transmits_allowed = instance->totem_config->window_size - token->fcc;
3626 }
3627
3628 instance->my_cbl = backlog_get (instance);
3629
3630 /*
3631 * Only do backlog calculation if there is a backlog otherwise
3632 * we would result in div by zero
3633 */
3634 if (token->backlog + instance->my_cbl - instance->my_pbl) {
3635 backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
3636 (token->backlog + instance->my_cbl - instance->my_pbl);
3637 if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3638 transmits_allowed = backlog_calc;
3639 }
3640 }
3641
3642 return (transmits_allowed);
3643}
3644
3645/*
3646 * don't overflow the RTR sort queue
3647 */
3648static void fcc_rtr_limit (
3649 struct totemsrp_instance *instance,
3650 struct orf_token *token,
3651 unsigned int *transmits_allowed)
3652{
3655 assert (check >= 0);
3656 if (sq_lt_compare (instance->last_released +
3658 instance->totem_config->window_size,
3659
3660 token->seq)) {
3661
3662 *transmits_allowed = 0;
3663 }
3664}
3665
3666static void fcc_token_update (
3667 struct totemsrp_instance *instance,
3668 struct orf_token *token,
3669 unsigned int msgs_transmitted)
3670{
3671 token->fcc += msgs_transmitted - instance->my_trc;
3672 token->backlog += instance->my_cbl - instance->my_pbl;
3673 instance->my_trc = msgs_transmitted;
3674 instance->my_pbl = instance->my_cbl;
3675}
3676
3677/*
3678 * Sanity checkers
3679 */
3680static int check_orf_token_sanity(
3681 const struct totemsrp_instance *instance,
3682 const void *msg,
3683 size_t msg_len,
3684 size_t max_msg_len,
3686{
3687 int rtr_entries;
3688 const struct orf_token *token = (const struct orf_token *)msg;
3689 size_t required_len;
3690
3691 if (msg_len > max_msg_len) {
3693 "Received orf_token message is too long... ignoring.");
3694
3695 return (-1);
3696 }
3697
3698 if (msg_len < sizeof(struct orf_token)) {
3700 "Received orf_token message is too short... ignoring.");
3701
3702 return (-1);
3703 }
3704
3707 } else {
3709 }
3710
3713 "Received orf_token message rtr_entries is corrupted... ignoring.");
3714
3715 return (-1);
3716 }
3717
3718 required_len = sizeof(struct orf_token) + rtr_entries * sizeof(struct rtr_item);
3719 if (msg_len < required_len) {
3721 "Received orf_token message is too short... ignoring.");
3722
3723 return (-1);
3724 }
3725
3726 return (0);
3727}
3728
3729static int check_mcast_sanity(
3730 struct totemsrp_instance *instance,
3731 const void *msg,
3732 size_t msg_len,
3734{
3735
3736 if (msg_len < sizeof(struct mcast)) {
3738 "Received mcast message is too short... ignoring.");
3739
3740 return (-1);
3741 }
3742
3743 return (0);
3744}
3745
3746static int check_memb_merge_detect_sanity(
3747 struct totemsrp_instance *instance,
3748 const void *msg,
3749 size_t msg_len,
3751{
3752
3753 if (msg_len < sizeof(struct memb_merge_detect)) {
3755 "Received memb_merge_detect message is too short... ignoring.");
3756
3757 return (-1);
3758 }
3759
3760 return (0);
3761}
3762
3763static int check_memb_join_sanity(
3764 struct totemsrp_instance *instance,
3765 const void *msg,
3766 size_t msg_len,
3768{
3769 const struct memb_join *mj_msg = (const struct memb_join *)msg;
3770 unsigned int proc_list_entries;
3771 unsigned int failed_list_entries;
3772 size_t required_len;
3773
3774 if (msg_len < sizeof(struct memb_join)) {
3776 "Received memb_join message is too short... ignoring.");
3777
3778 return (-1);
3779 }
3780
3781 proc_list_entries = mj_msg->proc_list_entries;
3782 failed_list_entries = mj_msg->failed_list_entries;
3783
3787 }
3788
3792 "Received memb_join message list_entries exceeds the maximum "
3793 "allowed value... ignoring.");
3794
3795 return (-1);
3796 }
3797
3798 required_len = sizeof(struct memb_join) +
3800 if (msg_len < required_len) {
3802 "Received memb_join message is too short... ignoring.");
3803
3804 return (-1);
3805 }
3806
3807 return (0);
3808}
3809
3810static int check_memb_commit_token_sanity(
3811 struct totemsrp_instance *instance,
3812 const void *msg,
3813 size_t msg_len,
3815{
3816 const struct memb_commit_token *mct_msg = (const struct memb_commit_token *)msg;
3817 unsigned int addr_entries;
3818 size_t required_len;
3819
3820 if (msg_len < sizeof(struct memb_commit_token)) {
3822 "Received memb_commit_token message is too short... ignoring.");
3823
3824 return (-1);
3825 }
3826
3827 addr_entries = mct_msg->addr_entries;
3830 }
3831
3832 required_len = sizeof(struct memb_commit_token) +
3834 if (msg_len < required_len) {
3836 "Received memb_commit_token message is too short... ignoring.");
3837
3838 return (-1);
3839 }
3840
3841 return (0);
3842}
3843
3844static int check_token_hold_cancel_sanity(
3845 struct totemsrp_instance *instance,
3846 const void *msg,
3847 size_t msg_len,
3849{
3850
3851 if (msg_len < sizeof(struct token_hold_cancel)) {
3853 "Received token_hold_cancel message is too short... ignoring.");
3854
3855 return (-1);
3856 }
3857
3858 return (0);
3859}
3860
3861/*
3862 * Message Handlers
3863 */
3864
3865#ifdef GIVEINFO
3866uint64_t tv_old;
3867#endif
3868/*
3869 * message handler called when TOKEN message type received
3870 */
3871static int message_handler_orf_token (
3872 struct totemsrp_instance *instance,
3873 const void *msg,
3874 size_t msg_len,
3876{
3877 char token_storage[1500];
3878 char token_convert[1500];
3879 struct orf_token *token = NULL;
3880 int forward_token;
3881 unsigned int transmits_allowed;
3882 unsigned int mcasted_retransmit;
3883 unsigned int mcasted_regular;
3884 unsigned int last_aru;
3885
3886#ifdef GIVEINFO
3889
3891 tv_diff = tv_current - tv_old;
3892 tv_old = tv_current;
3893
3895 "Time since last token %0.4f ms", tv_diff / (float)QB_TIME_NS_IN_MSEC);
3896#endif
3897
3898 if (check_orf_token_sanity(instance, msg, msg_len, sizeof(token_storage),
3899 endian_conversion_needed) == -1) {
3900 return (0);
3901 }
3902
3903 if (instance->orf_token_discard) {
3904 return (0);
3905 }
3906#ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3908 return (0);
3909 }
3910#endif
3911
3913 orf_token_endian_convert ((struct orf_token *)msg,
3914 (struct orf_token *)token_convert);
3915 msg = (struct orf_token *)token_convert;
3916 }
3917
3918 /*
3919 * Make copy of token and retransmit list in case we have
3920 * to flush incoming messages from the kernel queue
3921 */
3922 token = (struct orf_token *)token_storage;
3923 memcpy (token, msg, sizeof (struct orf_token));
3924 memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
3925 sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
3926
3927
3928 /*
3929 * Handle merge detection timeout
3930 */
3931 if (token->seq == instance->my_last_seq) {
3932 start_merge_detect_timeout (instance);
3933 instance->my_seq_unchanged += 1;
3934 } else {
3935 cancel_merge_detect_timeout (instance);
3936 cancel_token_hold_retransmit_timeout (instance);
3937 instance->my_seq_unchanged = 0;
3938 }
3939
3940 instance->my_last_seq = token->seq;
3941
3942#ifdef TEST_RECOVERY_MSG_COUNT
3943 if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) {
3944 return (0);
3945 }
3946#endif
3947 instance->flushing = 1;
3949 instance->flushing = 0;
3950
3951 /*
3952 * Determine if we should hold (in reality drop) the token
3953 */
3954 instance->my_token_held = 0;
3955 if (instance->my_ring_id.rep == instance->my_id.nodeid &&
3956 instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) {
3957 instance->my_token_held = 1;
3958 } else {
3959 if (instance->my_ring_id.rep != instance->my_id.nodeid &&
3960 instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) {
3961 instance->my_token_held = 1;
3962 }
3963 }
3964
3965 /*
3966 * Hold onto token when there is no activity on ring and
3967 * this processor is the ring rep
3968 */
3969 forward_token = 1;
3970 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
3971 if (instance->my_token_held) {
3972 forward_token = 0;
3973 }
3974 }
3975
3976 switch (instance->memb_state) {
3977 case MEMB_STATE_COMMIT:
3978 /* Discard token */
3979 break;
3980
3982 messages_free (instance, token->aru);
3983 /*
3984 * Do NOT add break, this case should also execute code in gather case.
3985 */
3986
3987 case MEMB_STATE_GATHER:
3988 /*
3989 * DO NOT add break, we use different free mechanism in recovery state
3990 */
3991
3993 /*
3994 * Discard tokens from another configuration
3995 */
3996 if (memcmp (&token->ring_id, &instance->my_ring_id,
3997 sizeof (struct memb_ring_id)) != 0) {
3998
3999 if ((forward_token)
4000 && instance->use_heartbeat) {
4001 reset_heartbeat_timeout(instance);
4002 }
4003 else {
4004 cancel_heartbeat_timeout(instance);
4005 }
4006
4007 return (0); /* discard token */
4008 }
4009
4010 /*
4011 * Discard retransmitted tokens
4012 */
4013 if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
4014 return (0); /* discard token */
4015 }
4016
4017 /*
4018 * Token is valid so trigger callbacks
4019 */
4020 token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED);
4021
4022 last_aru = instance->my_last_aru;
4023 instance->my_last_aru = token->aru;
4024
4025 transmits_allowed = fcc_calculate (instance, token);
4026 mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
4027
4029 instance->my_token_held == 1 &&
4030 (token->rtr_list_entries > 0 || mcasted_retransmit > 0)) {
4031 instance->my_token_held = 0;
4032 forward_token = 1;
4033 }
4034
4035 fcc_rtr_limit (instance, token, &transmits_allowed);
4036 mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
4037/*
4038if (mcasted_regular) {
4039printf ("mcasted regular %d\n", mcasted_regular);
4040printf ("token seq %d\n", token->seq);
4041}
4042*/
4043 fcc_token_update (instance, token, mcasted_retransmit +
4045
4046 if (sq_lt_compare (instance->my_aru, token->aru) ||
4047 instance->my_id.nodeid == token->aru_addr ||
4048 token->aru_addr == 0) {
4049
4050 token->aru = instance->my_aru;
4051 if (token->aru == token->seq) {
4052 token->aru_addr = 0;
4053 } else {
4054 token->aru_addr = instance->my_id.nodeid;
4055 }
4056 }
4057 if (token->aru == last_aru && token->aru_addr != 0) {
4058 instance->my_aru_count += 1;
4059 } else {
4060 instance->my_aru_count = 0;
4061 }
4062
4063 /*
4064 * We really don't follow specification there. In specification, OTHER nodes
4065 * detect failure of one node (based on aru_count) and my_id IS NEVER added
4066 * to failed list (so node never mark itself as failed)
4067 */
4068 if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
4069 token->aru_addr == instance->my_id.nodeid) {
4070
4072 "FAILED TO RECEIVE");
4073
4074 instance->failed_to_recv = 1;
4075
4076 memb_set_merge (&instance->my_id, 1,
4077 instance->my_failed_list,
4078 &instance->my_failed_list_entries);
4079
4080 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FAILED_TO_RECEIVE);
4081 } else {
4082 instance->my_token_seq = token->token_seq;
4083 token->token_seq += 1;
4084
4085 if (instance->memb_state == MEMB_STATE_RECOVERY) {
4086 /*
4087 * instance->my_aru == instance->my_high_seq_received means this processor
4088 * has recovered all messages it can recover
4089 * (ie: its retrans queue is empty)
4090 */
4091 if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
4092
4093 if (token->retrans_flg == 0) {
4094 token->retrans_flg = 1;
4095 instance->my_set_retrans_flg = 1;
4096 }
4097 } else
4098 if (token->retrans_flg == 1 && instance->my_set_retrans_flg) {
4099 token->retrans_flg = 0;
4100 instance->my_set_retrans_flg = 0;
4101 }
4103 "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4104 token->retrans_flg, instance->my_set_retrans_flg,
4105 cs_queue_is_empty (&instance->retrans_message_queue),
4106 instance->my_retrans_flg_count, token->aru);
4107 if (token->retrans_flg == 0) {
4108 instance->my_retrans_flg_count += 1;
4109 } else {
4110 instance->my_retrans_flg_count = 0;
4111 }
4112 if (instance->my_retrans_flg_count == 2) {
4113 instance->my_install_seq = token->seq;
4114 }
4116 "install seq %x aru %x high seq received %x",
4117 instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
4118 if (instance->my_retrans_flg_count >= 2 &&
4119 instance->my_received_flg == 0 &&
4120 sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
4121 instance->my_received_flg = 1;
4122 instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
4123 memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
4124 sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
4125 }
4126 if (instance->my_retrans_flg_count >= 3 &&
4127 sq_lte_compare (instance->my_install_seq, token->aru)) {
4128 instance->my_rotation_counter += 1;
4129 } else {
4130 instance->my_rotation_counter = 0;
4131 }
4132 if (instance->my_rotation_counter == 2) {
4134 "retrans flag count %x token aru %x install seq %x aru %x %x",
4135 instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
4136 instance->my_aru, token->seq);
4137
4138 memb_state_operational_enter (instance);
4139 instance->my_rotation_counter = 0;
4140 instance->my_retrans_flg_count = 0;
4141 }
4142 }
4143
4145 token_send (instance, token, forward_token);
4146
4147#ifdef GIVEINFO
4149 tv_diff = tv_current - tv_old;
4150 tv_old = tv_current;
4152 "I held %0.4f ms",
4153 tv_diff / (float)QB_TIME_NS_IN_MSEC);
4154#endif
4155 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4156 messages_deliver_to_app (instance, 0,
4157 instance->my_high_seq_received);
4158 }
4159
4160 /*
4161 * Deliver messages after token has been transmitted
4162 * to improve performance
4163 */
4164 reset_token_timeout (instance); // REVIEWED
4165 reset_token_retransmit_timeout (instance); // REVIEWED
4166 if (instance->my_id.nodeid == instance->my_ring_id.rep &&
4167 instance->my_token_held == 1) {
4168
4169 start_token_hold_retransmit_timeout (instance);
4170 }
4171
4172 token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);
4173 }
4174 break;
4175 }
4176
4177 if ((forward_token)
4178 && instance->use_heartbeat) {
4179 reset_heartbeat_timeout(instance);
4180 }
4181 else {
4182 cancel_heartbeat_timeout(instance);
4183 }
4184
4185 return (0);
4186}
4187
4188static void messages_deliver_to_app (
4189 struct totemsrp_instance *instance,
4190 int skip,
4191 unsigned int end_point)
4192{
4194 unsigned int i;
4195 int res;
4196 struct mcast *mcast_in;
4197 struct mcast mcast_header;
4198 unsigned int range = 0;
4200 unsigned int my_high_delivered_stored = 0;
4202
4203 range = end_point - instance->my_high_delivered;
4204
4205 if (range) {
4207 "Delivering %x to %x", instance->my_high_delivered,
4208 end_point);
4209 }
4212
4213 /*
4214 * Deliver messages in order from rtr queue to pending delivery queue
4215 */
4216 for (i = 1; i <= range; i++) {
4217
4218 void *ptr = 0;
4219
4220 /*
4221 * If out of range of sort queue, stop assembly
4222 */
4223 res = sq_in_range (&instance->regular_sort_queue,
4225 if (res == 0) {
4226 break;
4227 }
4228
4229 res = sq_item_get (&instance->regular_sort_queue,
4231 /*
4232 * If hole, stop assembly
4233 */
4234 if (res != 0 && skip == 0) {
4235 break;
4236 }
4237
4239
4240 if (res != 0) {
4241 continue;
4242
4243 }
4244
4246
4247 mcast_in = sort_queue_item_p->mcast;
4248 assert (mcast_in != (struct mcast *)0xdeadbeef);
4249
4251 if (mcast_in->header.magic != TOTEM_MH_MAGIC) {
4253 mcast_endian_convert (mcast_in, &mcast_header);
4254 } else {
4255 memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
4256 }
4257
4258 aligned_system_from = mcast_header.system_from;
4259
4260 /*
4261 * Skip messages not originated in instance->my_deliver_memb
4262 */
4263 if (skip &&
4264 memb_set_subset (&aligned_system_from,
4265 1,
4266 instance->my_deliver_memb_list,
4267 instance->my_deliver_memb_entries) == 0) {
4268
4270
4271 continue;
4272 }
4273
4274 /*
4275 * Message found
4276 */
4278 "Delivering MCAST message with seq %x to pending delivery queue",
4279 mcast_header.seq);
4280
4281 /*
4282 * Message is locally originated multicast
4283 */
4284 instance->totemsrp_deliver_fn (
4285 mcast_header.header.nodeid,
4286 ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
4287 sort_queue_item_p->msg_len - sizeof (struct mcast),
4289 }
4290}
4291
4292/*
4293 * recv message handler called when MCAST message type received
4294 */
4295static int message_handler_mcast (
4296 struct totemsrp_instance *instance,
4297 const void *msg,
4298 size_t msg_len,
4300{
4302 struct sq *sort_queue;
4303 struct mcast mcast_header;
4305
4306 if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4307 return (0);
4308 }
4309
4311 mcast_endian_convert (msg, &mcast_header);
4312 } else {
4313 memcpy (&mcast_header, msg, sizeof (struct mcast));
4314 }
4315
4316 if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
4317 sort_queue = &instance->recovery_sort_queue;
4318 } else {
4319 sort_queue = &instance->regular_sort_queue;
4320 }
4321
4322 assert (msg_len <= FRAME_SIZE_MAX);
4323
4324#ifdef TEST_DROP_MCAST_PERCENTAGE
4325 if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4326 return (0);
4327 }
4328#endif
4329
4330 /*
4331 * If the message is foreign execute the switch below
4332 */
4333 if (memcmp (&instance->my_ring_id, &mcast_header.ring_id,
4334 sizeof (struct memb_ring_id)) != 0) {
4335
4336 aligned_system_from = mcast_header.system_from;
4337
4338 switch (instance->memb_state) {
4340 memb_set_merge (
4342 instance->my_proc_list, &instance->my_proc_list_entries);
4343 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE);
4344 break;
4345
4346 case MEMB_STATE_GATHER:
4347 if (!memb_set_subset (
4349 1,
4350 instance->my_proc_list,
4351 instance->my_proc_list_entries)) {
4352
4353 memb_set_merge (&aligned_system_from, 1,
4354 instance->my_proc_list, &instance->my_proc_list_entries);
4355 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE);
4356 return (0);
4357 }
4358 break;
4359
4360 case MEMB_STATE_COMMIT:
4361 /* discard message */
4362 instance->stats.rx_msg_dropped++;
4363 break;
4364
4366 /* discard message */
4367 instance->stats.rx_msg_dropped++;
4368 break;
4369 }
4370 return (0);
4371 }
4372
4374 "Received ringid (" CS_PRI_RING_ID ") seq %x",
4375 mcast_header.ring_id.rep,
4376 (uint64_t)mcast_header.ring_id.seq,
4377 mcast_header.seq);
4378
4379 /*
4380 * Add mcast message to rtr queue if not already in rtr queue
4381 * otherwise free io vectors
4382 */
4383 if (msg_len > 0 && msg_len <= FRAME_SIZE_MAX &&
4384 sq_in_range (sort_queue, mcast_header.seq) &&
4385 sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4386
4387 /*
4388 * Allocate new multicast memory block
4389 */
4390// TODO LEAK
4391 sort_queue_item.mcast = totemsrp_buffer_alloc (instance);
4392 if (sort_queue_item.mcast == NULL) {
4393 return (-1); /* error here is corrected by the algorithm */
4394 }
4395 memcpy (sort_queue_item.mcast, msg, msg_len);
4396 sort_queue_item.msg_len = msg_len;
4397
4398 if (sq_lt_compare (instance->my_high_seq_received,
4399 mcast_header.seq)) {
4400 instance->my_high_seq_received = mcast_header.seq;
4401 }
4402
4403 sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4404 }
4405
4406 update_aru (instance);
4407 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4408 messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
4409 }
4410
4411/* TODO remove from retrans message queue for old ring in recovery state */
4412 return (0);
4413}
4414
4415static int message_handler_memb_merge_detect (
4416 struct totemsrp_instance *instance,
4417 const void *msg,
4418 size_t msg_len,
4420{
4423
4424 if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4425 return (0);
4426 }
4427
4429 memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4430 } else {
4432 sizeof (struct memb_merge_detect));
4433 }
4434
4435 /*
4436 * do nothing if this is a merge detect from this configuration
4437 */
4438 if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
4439 sizeof (struct memb_ring_id)) == 0) {
4440
4441 return (0);
4442 }
4443
4445
4446 /*
4447 * Execute merge operation
4448 */
4449 switch (instance->memb_state) {
4451 memb_set_merge (&aligned_system_from, 1,
4452 instance->my_proc_list, &instance->my_proc_list_entries);
4453 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE);
4454 break;
4455
4456 case MEMB_STATE_GATHER:
4457 if (!memb_set_subset (
4459 1,
4460 instance->my_proc_list,
4461 instance->my_proc_list_entries)) {
4462
4463 memb_set_merge (&aligned_system_from, 1,
4464 instance->my_proc_list, &instance->my_proc_list_entries);
4465 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE);
4466 return (0);
4467 }
4468 break;
4469
4470 case MEMB_STATE_COMMIT:
4471 /* do nothing in commit */
4472 break;
4473
4475 /* do nothing in recovery */
4476 break;
4477 }
4478 return (0);
4479}
4480
4481static void memb_join_process (
4482 struct totemsrp_instance *instance,
4483 const struct memb_join *memb_join)
4484{
4485 struct srp_addr *proc_list;
4486 struct srp_addr *failed_list;
4487 int gather_entered = 0;
4491
4495
4496 log_printf(instance->totemsrp_log_level_trace, "memb_join_process");
4497 memb_set_log(instance, instance->totemsrp_log_level_trace,
4498 "proclist", proc_list, memb_join->proc_list_entries);
4499 memb_set_log(instance, instance->totemsrp_log_level_trace,
4501 memb_set_log(instance, instance->totemsrp_log_level_trace,
4502 "my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
4503 memb_set_log(instance, instance->totemsrp_log_level_trace,
4504 "my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
4505
4507 if (instance->flushing) {
4510 "Discarding LEAVE message during flush, nodeid=" CS_PRI_NODE_ID,
4512 if (memb_join->failed_list_entries > 0) {
4513 my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4514 }
4515 } else {
4517 "Discarding JOIN message during flush, nodeid=" CS_PRI_NODE_ID, memb_join->header.nodeid);
4518 }
4519 return;
4520 } else {
4523 "Received LEAVE message from " CS_PRI_NODE_ID, memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].nodeid : LEAVE_DUMMY_NODEID);
4524 if (memb_join->failed_list_entries > 0) {
4525 my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4526 }
4527 }
4528 }
4529
4530 }
4531
4532 if (memb_set_equal (proc_list,
4534 instance->my_proc_list,
4535 instance->my_proc_list_entries) &&
4536
4537 memb_set_equal (failed_list,
4539 instance->my_failed_list,
4540 instance->my_failed_list_entries)) {
4541
4543 memb_consensus_set (instance, &aligned_system_from);
4544 }
4545
4546 if (memb_consensus_agreed (instance) && instance->failed_to_recv == 1) {
4547 instance->failed_to_recv = 0;
4548 instance->my_proc_list[0] = instance->my_id;
4549 instance->my_proc_list_entries = 1;
4550 instance->my_failed_list_entries = 0;
4551
4552 memb_state_commit_token_create (instance);
4553
4554 memb_state_commit_enter (instance);
4555 return;
4556 }
4557 if (memb_consensus_agreed (instance) &&
4558 memb_lowest_in_config (instance)) {
4559
4560 memb_state_commit_token_create (instance);
4561
4562 memb_state_commit_enter (instance);
4563 } else {
4564 goto out;
4565 }
4566 } else
4567 if (memb_set_subset (proc_list,
4569 instance->my_proc_list,
4570 instance->my_proc_list_entries) &&
4571
4572 memb_set_subset (failed_list,
4574 instance->my_failed_list,
4575 instance->my_failed_list_entries)) {
4576
4577 goto out;
4578 } else
4579 if (memb_set_subset (&aligned_system_from, 1,
4580 instance->my_failed_list, instance->my_failed_list_entries)) {
4581
4582 goto out;
4583 } else {
4584 memb_set_merge (proc_list,
4586 instance->my_proc_list, &instance->my_proc_list_entries);
4587
4588 if (memb_set_subset (
4589 &instance->my_id, 1,
4591
4592 memb_set_merge (
4594 instance->my_failed_list, &instance->my_failed_list_entries);
4595 } else {
4596 if (memb_set_subset (
4598 instance->my_memb_list,
4599 instance->my_memb_entries)) {
4600
4601 if (memb_set_subset (
4603 instance->my_failed_list,
4604 instance->my_failed_list_entries) == 0) {
4605
4606 memb_set_merge (failed_list,
4608 instance->my_failed_list, &instance->my_failed_list_entries);
4609 } else {
4610 memb_set_subtract (fail_minus_memb,
4614 instance->my_memb_list,
4615 instance->my_memb_entries);
4616
4617 memb_set_merge (fail_minus_memb,
4619 instance->my_failed_list,
4620 &instance->my_failed_list_entries);
4621 }
4622 }
4623 }
4624 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_JOIN);
4625 gather_entered = 1;
4626 }
4627
4628out:
4629 if (gather_entered == 0 &&
4630 instance->memb_state == MEMB_STATE_OPERATIONAL) {
4631
4632 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE);
4633 }
4634}
4635
4636static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
4637{
4638 int i;
4639 struct srp_addr *in_proc_list;
4640 struct srp_addr *in_failed_list;
4641 struct srp_addr *out_proc_list;
4642 struct srp_addr *out_failed_list;
4643
4644 out->header.magic = TOTEM_MH_MAGIC;
4645 out->header.version = TOTEM_MH_VERSION;
4646 out->header.type = in->header.type;
4647 out->header.nodeid = swab32 (in->header.nodeid);
4648 out->system_from = srp_addr_endian_convert(in->system_from);
4649 out->proc_list_entries = swab32 (in->proc_list_entries);
4650 out->failed_list_entries = swab32 (in->failed_list_entries);
4651 out->ring_seq = swab64 (in->ring_seq);
4652
4653 in_proc_list = (struct srp_addr *)in->end_of_memb_join;
4654 in_failed_list = in_proc_list + out->proc_list_entries;
4655 out_proc_list = (struct srp_addr *)out->end_of_memb_join;
4656 out_failed_list = out_proc_list + out->proc_list_entries;
4657
4658 for (i = 0; i < out->proc_list_entries; i++) {
4659 out_proc_list[i] = srp_addr_endian_convert (in_proc_list[i]);
4660 }
4661 for (i = 0; i < out->failed_list_entries; i++) {
4662 out_failed_list[i] = srp_addr_endian_convert (in_failed_list[i]);
4663 }
4664}
4665
4666static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
4667{
4668 int i;
4669 struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
4670 struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token;
4673
4674 out->header.magic = TOTEM_MH_MAGIC;
4675 out->header.version = TOTEM_MH_VERSION;
4676 out->header.type = in->header.type;
4677 out->header.nodeid = swab32 (in->header.nodeid);
4678 out->token_seq = swab32 (in->token_seq);
4679 out->ring_id.rep = swab32(in->ring_id.rep);
4680 out->ring_id.seq = swab64 (in->ring_id.seq);
4681 out->retrans_flg = swab32 (in->retrans_flg);
4682 out->memb_index = swab32 (in->memb_index);
4683 out->addr_entries = swab32 (in->addr_entries);
4684
4685 in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries);
4686 out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries);
4687 for (i = 0; i < out->addr_entries; i++) {
4688 out_addr[i] = srp_addr_endian_convert (in_addr[i]);
4689
4690 /*
4691 * Only convert the memb entry if it has been set
4692 */
4693 if (in_memb_list[i].ring_id.rep != 0) {
4694 out_memb_list[i].ring_id.rep = swab32(in_memb_list[i].ring_id.rep);
4695
4696 out_memb_list[i].ring_id.seq =
4699 out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
4700 out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
4701 }
4702 }
4703}
4704
4705static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
4706{
4707 int i;
4708
4709 out->header.magic = TOTEM_MH_MAGIC;
4710 out->header.version = TOTEM_MH_VERSION;
4711 out->header.type = in->header.type;
4712 out->header.nodeid = swab32 (in->header.nodeid);
4713 out->seq = swab32 (in->seq);
4714 out->token_seq = swab32 (in->token_seq);
4715 out->aru = swab32 (in->aru);
4716 out->ring_id.rep = swab32(in->ring_id.rep);
4717 out->aru_addr = swab32(in->aru_addr);
4718 out->ring_id.seq = swab64 (in->ring_id.seq);
4719 out->fcc = swab32 (in->fcc);
4720 out->backlog = swab32 (in->backlog);
4721 out->retrans_flg = swab32 (in->retrans_flg);
4722 out->rtr_list_entries = swab32 (in->rtr_list_entries);
4723 for (i = 0; i < out->rtr_list_entries; i++) {
4724 out->rtr_list[i].ring_id.rep = swab32(in->rtr_list[i].ring_id.rep);
4725 out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
4726 out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
4727 }
4728}
4729
4730static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
4731{
4732 out->header.magic = TOTEM_MH_MAGIC;
4733 out->header.version = TOTEM_MH_VERSION;
4734 out->header.type = in->header.type;
4735 out->header.nodeid = swab32 (in->header.nodeid);
4736 out->header.encapsulated = in->header.encapsulated;
4737
4738 out->seq = swab32 (in->seq);
4739 out->this_seqno = swab32 (in->this_seqno);
4740 out->ring_id.rep = swab32(in->ring_id.rep);
4741 out->ring_id.seq = swab64 (in->ring_id.seq);
4742 out->node_id = swab32 (in->node_id);
4743 out->guarantee = swab32 (in->guarantee);
4744 out->system_from = srp_addr_endian_convert(in->system_from);
4745}
4746
4747static void memb_merge_detect_endian_convert (
4748 const struct memb_merge_detect *in,
4749 struct memb_merge_detect *out)
4750{
4751 out->header.magic = TOTEM_MH_MAGIC;
4752 out->header.version = TOTEM_MH_VERSION;
4753 out->header.type = in->header.type;
4754 out->header.nodeid = swab32 (in->header.nodeid);
4755 out->ring_id.rep = swab32(in->ring_id.rep);
4756 out->ring_id.seq = swab64 (in->ring_id.seq);
4757 out->system_from = srp_addr_endian_convert (in->system_from);
4758}
4759
4760static int ignore_join_under_operational (
4761 struct totemsrp_instance *instance,
4762 const struct memb_join *memb_join)
4763{
4764 struct srp_addr *proc_list;
4765 struct srp_addr *failed_list;
4766 unsigned long long ring_seq;
4768
4773
4774 if (memb_set_subset (&instance->my_id, 1,
4776 return (1);
4777 }
4778
4779 /*
4780 * In operational state, my_proc_list is exactly the same as
4781 * my_memb_list.
4782 */
4783 if ((memb_set_subset (&aligned_system_from, 1,
4784 instance->my_memb_list, instance->my_memb_entries)) &&
4785 (ring_seq < instance->my_ring_id.seq)) {
4786 return (1);
4787 }
4788
4789 return (0);
4790}
4791
4792static int message_handler_memb_join (
4793 struct totemsrp_instance *instance,
4794 const void *msg,
4795 size_t msg_len,
4797{
4798 const struct memb_join *memb_join;
4799 struct memb_join *memb_join_convert = alloca (msg_len);
4801
4802 if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4803 return (0);
4804 }
4805
4808 memb_join_endian_convert (msg, memb_join_convert);
4809
4810 } else {
4811 memb_join = msg;
4812 }
4813
4815
4816 /*
4817 * If the process paused because it wasn't scheduled in a timely
4818 * fashion, flush the join messages because they may be queued
4819 * entries
4820 */
4821 if (pause_flush (instance)) {
4822 return (0);
4823 }
4824
4825 if (instance->token_ring_id_seq < memb_join->ring_seq) {
4827 }
4828 switch (instance->memb_state) {
4830 if (!ignore_join_under_operational (instance, memb_join)) {
4831 memb_join_process (instance, memb_join);
4832 }
4833 break;
4834
4835 case MEMB_STATE_GATHER:
4836 memb_join_process (instance, memb_join);
4837 break;
4838
4839 case MEMB_STATE_COMMIT:
4840 if (memb_set_subset (&aligned_system_from,
4841 1,
4842 instance->my_new_memb_list,
4843 instance->my_new_memb_entries) &&
4844
4845 memb_join->ring_seq >= instance->my_ring_id.seq) {
4846
4847 memb_join_process (instance, memb_join);
4848 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE);
4849 }
4850 break;
4851
4853 if (memb_set_subset (&aligned_system_from,
4854 1,
4855 instance->my_new_memb_list,
4856 instance->my_new_memb_entries) &&
4857
4858 memb_join->ring_seq >= instance->my_ring_id.seq) {
4859
4860 memb_join_process (instance, memb_join);
4861 memb_recovery_state_token_loss (instance);
4862 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY);
4863 }
4864 break;
4865 }
4866 return (0);
4867}
4868
4869static int message_handler_memb_commit_token (
4870 struct totemsrp_instance *instance,
4871 const void *msg,
4872 size_t msg_len,
4874{
4878 int sub_entries;
4879
4880 struct srp_addr *addr;
4881
4883 "got commit token");
4884
4885 if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4886 return (0);
4887 }
4888
4890 memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4891 } else {
4892 memcpy (memb_commit_token_convert, msg, msg_len);
4893 }
4896
4897#ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4899 return (0);
4900 }
4901#endif
4902 switch (instance->memb_state) {
4904 /* discard token */
4905 break;
4906
4907 case MEMB_STATE_GATHER:
4908 memb_set_subtract (sub, &sub_entries,
4909 instance->my_proc_list, instance->my_proc_list_entries,
4910 instance->my_failed_list, instance->my_failed_list_entries);
4911
4912 if (memb_set_equal (addr,
4914 sub,
4915 sub_entries) &&
4916
4918 memcpy (instance->commit_token, memb_commit_token, msg_len);
4919 memb_state_commit_enter (instance);
4920 }
4921 break;
4922
4923 case MEMB_STATE_COMMIT:
4924 /*
4925 * If retransmitted commit tokens are sent on this ring
4926 * filter them out and only enter recovery once the
4927 * commit token has traversed the array. This is
4928 * determined by :
4929 * memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4930 */
4931 if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
4933 memb_state_recovery_enter (instance, memb_commit_token);
4934 }
4935 break;
4936
4938 if (instance->my_id.nodeid == instance->my_ring_id.rep) {
4939
4940 /* Filter out duplicated tokens */
4941 if (instance->originated_orf_token) {
4942 break;
4943 }
4944
4945 instance->originated_orf_token = 1;
4946
4948 "Sending initial ORF token");
4949
4950 // TODO convert instead of initiate
4951 orf_token_send_initial (instance);
4952 reset_token_timeout (instance); // REVIEWED
4953 reset_token_retransmit_timeout (instance); // REVIEWED
4954 }
4955 break;
4956 }
4957 return (0);
4958}
4959
4960static int message_handler_token_hold_cancel (
4961 struct totemsrp_instance *instance,
4962 const void *msg,
4963 size_t msg_len,
4965{
4966 const struct token_hold_cancel *token_hold_cancel = msg;
4967
4968 if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4969 return (0);
4970 }
4971
4972 if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
4973 sizeof (struct memb_ring_id)) == 0) {
4974
4975 instance->my_seq_unchanged = 0;
4976 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
4977 timer_function_token_retransmit_timeout (instance);
4978 }
4979 }
4980 return (0);
4981}
4982
4983static int check_message_header_validity(
4984 void *context,
4985 const void *msg,
4986 unsigned int msg_len,
4987 const struct sockaddr_storage *system_from)
4988{
4989 struct totemsrp_instance *instance = context;
4990 const struct totem_message_header *message_header = msg;
4991 const char *guessed_str;
4992 const char *msg_byte = msg;
4993
4994 if (msg_len < sizeof (struct totem_message_header)) {
4996 "Message received from %s is too short... Ignoring %u.",
4997 totemip_sa_print((struct sockaddr *)system_from), (unsigned int)msg_len);
4998 return (-1);
4999 }
5000
5001 if (message_header->magic != TOTEM_MH_MAGIC &&
5003 /*
5004 * We've received ether Knet, old version of Corosync,
5005 * or something else. Do some guessing to display (hopefully)
5006 * helpful message
5007 */
5008 guessed_str = NULL;
5009
5010 if (message_header->magic == 0xFFFF) {
5011 /*
5012 * Corosync 2.2 used header with two UINT8_MAX
5013 */
5014 guessed_str = "Corosync 2.2";
5015 } else if (message_header->magic == 0xFEFE) {
5016 /*
5017 * Corosync 2.3+ used header with two UINT8_MAX - 1
5018 */
5019 guessed_str = "Corosync 2.3+";
5020 } else if (msg_byte[0] == 0x01) {
5021 /*
5022 * Knet has stable1 with first byte of message == 1
5023 */
5024 guessed_str = "unencrypted Kronosnet";
5025 } else if (msg_byte[0] >= 0 && msg_byte[0] <= 5) {
5026 /*
5027 * Unencrypted Corosync 1.x/OpenAIS has first byte
5028 * 0-5. Collision with Knet (but still worth the try)
5029 */
5030 guessed_str = "unencrypted Corosync 2.0/2.1/1.x/OpenAIS";
5031 } else {
5032 /*
5033 * Encrypted Kronosned packet has a hash at the end of
5034 * the packet and nothing specific at the beginning of the
5035 * packet (just encrypted data).
5036 * Encrypted Corosync 1.x/OpenAIS is quite similar but hash_digest
5037 * is in the beginning of the packet.
5038 *
5039 * So it's not possible to reliably detect ether of them.
5040 */
5041 guessed_str = "encrypted Kronosnet/Corosync 2.0/2.1/1.x/OpenAIS or unknown";
5042 }
5043
5045 "Message received from %s has bad magic number (probably sent by %s).. Ignoring",
5047 guessed_str);
5048
5049 return (-1);
5050 }
5051
5052 if (message_header->version != TOTEM_MH_VERSION) {
5054 "Message received from %s has unsupported version %u... Ignoring",
5056 message_header->version);
5057
5058 return (-1);
5059 }
5060
5061 return (0);
5062}
5063
5064
5066 void *context,
5067 const void *msg,
5068 unsigned int msg_len,
5069 const struct sockaddr_storage *system_from)
5070{
5071 struct totemsrp_instance *instance = context;
5072 const struct totem_message_header *message_header = msg;
5073
5074 if (check_message_header_validity(context, msg, msg_len, system_from) == -1) {
5075 return -1;
5076 }
5077
5078 switch (message_header->type) {
5080 instance->stats.orf_token_rx++;
5081 break;
5082 case MESSAGE_TYPE_MCAST:
5083 instance->stats.mcast_rx++;
5084 break;
5086 instance->stats.memb_merge_detect_rx++;
5087 break;
5089 instance->stats.memb_join_rx++;
5090 break;
5092 instance->stats.memb_commit_token_rx++;
5093 break;
5095 instance->stats.token_hold_cancel_rx++;
5096 break;
5097 default:
5099 "Message received from %s has wrong type... ignoring %d.\n",
5101 (int)message_header->type);
5102
5103 instance->stats.rx_msg_dropped++;
5104 return 0;
5105 }
5106 /*
5107 * Handle incoming message
5108 */
5110 instance,
5111 msg,
5112 msg_len,
5113 message_header->magic != TOTEM_MH_MAGIC);
5114}
5115
5117 void *context,
5118 const struct totem_ip_address *interface_addr,
5119 unsigned short ip_port,
5120 unsigned int iface_no)
5121{
5122 struct totemsrp_instance *instance = context;
5123 int res;
5124
5126
5128 instance->totemnet_context,
5130 ip_port,
5131 iface_no);
5132
5133 return (res);
5134}
5135
5136/* Contrary to its name, this only gets called when the interface is enabled */
5138 void *context,
5139 const struct totem_ip_address *iface_addr,
5140 unsigned int iface_no)
5141{
5142 struct totemsrp_instance *instance = context;
5143 int num_interfaces;
5144 int i;
5145 int res = 0;
5146
5147 if (!instance->my_id.nodeid) {
5148 instance->my_id.nodeid = iface_addr->nodeid;
5149 }
5151
5152 if (instance->iface_changes++ == 0) {
5153 instance->memb_ring_id_create_or_load (&instance->my_ring_id, instance->my_id.nodeid);
5154 /*
5155 * Increase the ring_id sequence number. This doesn't follow specification.
5156 * Solves problem with restarted leader node (node with lowest nodeid) before
5157 * rest of the cluster forms new membership and guarantees unique ring_id for
5158 * new singleton configuration.
5159 */
5160 instance->my_ring_id.seq++;
5161
5162 instance->token_ring_id_seq = instance->my_ring_id.seq;
5163 log_printf (
5164 instance->totemsrp_log_level_debug,
5165 "Created or loaded sequence id " CS_PRI_RING_ID " for this ring.",
5166 instance->my_ring_id.rep,
5167 (uint64_t)instance->my_ring_id.seq);
5168
5169 if (instance->totemsrp_service_ready_fn) {
5170 instance->totemsrp_service_ready_fn ();
5171 }
5172
5173 }
5174
5175 num_interfaces = 0;
5176 for (i = 0; i < INTERFACE_MAX; i++) {
5177 if (instance->totem_config->interfaces[i].configured) {
5179 }
5180 }
5181
5182 if (instance->iface_changes >= num_interfaces) {
5183 /* We need to clear orig_interfaces so that 'commit' diffs against nothing */
5184 instance->totem_config->orig_interfaces = malloc (sizeof (struct totem_interface) * INTERFACE_MAX);
5185 assert(instance->totem_config->orig_interfaces != NULL);
5187
5189
5190 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_INTERFACE_CHANGE);
5191 free(instance->totem_config->orig_interfaces);
5192 }
5193 return res;
5194}
5195
5197 totem_config->net_mtu -= 2 * sizeof (struct mcast);
5198}
5199
5201 void *context,
5202 void (*totem_service_ready) (void))
5203{
5204 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5205
5207}
5208
5210 void *context,
5211 const struct totem_ip_address *member,
5212 int iface_no)
5213{
5214 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5215 int res;
5216
5217 res = totemnet_member_add (instance->totemnet_context, &instance->my_addrs[iface_no], member, iface_no);
5218
5219 return (res);
5220}
5221
5223 void *context,
5224 const struct totem_ip_address *member,
5225 int iface_no)
5226{
5227 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5228 int res;
5229
5230 res = totemnet_member_remove (instance->totemnet_context, member, iface_no);
5231
5232 return (res);
5233}
5234
5236{
5237 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5238
5239 instance->threaded_mode_enabled = 1;
5240}
5241
5242void totemsrp_trans_ack (void *context)
5243{
5244 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5245
5246 instance->waiting_trans_ack = 0;
5248}
5249
5250
5252{
5253 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5254 int res;
5255
5257 return (res);
5258}
5259
5261{
5262 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5263 int res;
5264
5266 return (res);
5267}
5268
5269void totemsrp_stats_clear (void *context, int flags)
5270{
5271 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5272
5273 memset(&instance->stats, 0, sizeof(totemsrp_stats_t));
5276 }
5277}
5278
5279void totemsrp_force_gather (void *context)
5280{
5281 timer_function_orf_token_timeout(context);
5282}
totem_configuration_type
The totem_configuration_type enum.
Definition coroapi.h:132
@ TOTEM_CONFIGURATION_REGULAR
Definition coroapi.h:133
@ TOTEM_CONFIGURATION_TRANSITIONAL
Definition coroapi.h:134
#define INTERFACE_MAX
Definition coroapi.h:88
#define MESSAGE_QUEUE_MAX
Definition coroapi.h:98
unsigned int nodeid
Definition coroapi.h:0
unsigned char addr[TOTEMIP_ADDRLEN]
Definition coroapi.h:2
totem_callback_token_type
The totem_callback_token_type enum.
Definition coroapi.h:142
@ TOTEM_CALLBACK_TOKEN_SENT
Definition coroapi.h:144
@ TOTEM_CALLBACK_TOKEN_RECEIVED
Definition coroapi.h:143
#define PROCESSOR_COUNT_MAX
Definition coroapi.h:96
#define CS_PRI_RING_ID_SEQ
Definition corotypes.h:61
#define CS_PRI_NODE_ID
Definition corotypes.h:59
#define CS_PRI_RING_ID
Definition corotypes.h:62
uint32_t flags
uint32_t value
icmap_map_t icmap_get_global_map(void)
Return global icmap.
Definition icmap.c:268
#define LOGSYS_LEVEL_DEBUG
Definition logsys.h:76
struct srp_addr addr
Definition totemsrp.c:164
int guarantee
Definition totemsrp.c:190
unsigned int node_id
Definition totemsrp.c:189
struct memb_ring_id ring_id
Definition totemsrp.c:188
struct totem_message_header header
Definition totemsrp.c:184
unsigned int seq
Definition totemsrp.c:186
int this_seqno
Definition totemsrp.c:187
struct srp_addr system_from
Definition totemsrp.c:185
Definition totemsrp.c:243
unsigned int aru
Definition totemsrp.c:245
unsigned int received_flg
Definition totemsrp.c:247
struct memb_ring_id ring_id
Definition totemsrp.c:244
unsigned int high_delivered
Definition totemsrp.c:246
unsigned int retrans_flg
Definition totemsrp.c:255
struct totem_message_header header
Definition totemsrp.c:252
unsigned char end_of_commit_token[0]
Definition totemsrp.c:258
unsigned int token_seq
Definition totemsrp.c:253
struct memb_ring_id ring_id
Definition totemsrp.c:254
struct srp_addr system_from
Definition totemsrp.c:217
struct totem_message_header header
Definition totemsrp.c:216
unsigned char end_of_memb_join[0]
Definition totemsrp.c:221
unsigned long long ring_seq
Definition totemsrp.c:220
unsigned int failed_list_entries
Definition totemsrp.c:219
unsigned int proc_list_entries
Definition totemsrp.c:218
struct totem_message_header header
Definition totemsrp.c:231
struct memb_ring_id ring_id
Definition totemsrp.c:233
struct srp_addr system_from
Definition totemsrp.c:232
The memb_ring_id struct.
Definition coroapi.h:122
unsigned long long seq
Definition coroapi.h:124
unsigned int rep
Definition totem.h:150
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
Definition totemsrp.c:535
unsigned int msg_len
Definition totemsrp.c:269
struct mcast * mcast
Definition totemsrp.c:268
unsigned int backlog
Definition totemsrp.c:207
unsigned int token_seq
Definition totemsrp.c:203
unsigned int aru_addr
Definition totemsrp.c:205
unsigned int fcc
Definition totemsrp.c:208
unsigned int aru
Definition totemsrp.c:204
int rtr_list_entries
Definition totemsrp.c:210
struct rtr_item rtr_list[0]
Definition totemsrp.c:211
int retrans_flg
Definition totemsrp.c:209
unsigned int seq
Definition totemsrp.c:202
struct totem_message_header header
Definition totemsrp.c:201
struct memb_ring_id ring_id
Definition totemsrp.c:206
struct memb_ring_id ring_id
Definition totemsrp.c:195
unsigned int seq
Definition totemsrp.c:196
unsigned int msg_len
Definition totemsrp.c:274
struct mcast * mcast
Definition totemsrp.c:273
The sq struct.
Definition sq.h:43
unsigned int nodeid
Definition totemsrp.c:108
struct qb_list_head list
Definition totemsrp.c:170
int(* callback_fn)(enum totem_callback_token_type type, const void *)
Definition totemsrp.c:171
enum totem_callback_token_type callback_type
Definition totemsrp.c:172
struct totem_message_header header
Definition totemsrp.c:238
struct memb_ring_id ring_id
Definition totemsrp.c:239
unsigned int max_messages
Definition totem.h:220
unsigned int heartbeat_failures_allowed
Definition totem.h:214
unsigned int token_timeout
Definition totem.h:182
unsigned int window_size
Definition totem.h:218
struct totem_logging_configuration totem_logging_configuration
Definition totem.h:208
unsigned int downcheck_timeout
Definition totem.h:200
unsigned int miss_count_const
Definition totem.h:242
struct totem_interface * interfaces
Definition totem.h:165
unsigned int cancel_token_hold_on_retransmit
Definition totem.h:248
unsigned int fail_to_recv_const
Definition totem.h:202
unsigned int merge_timeout
Definition totem.h:198
struct totem_interface * orig_interfaces
Definition totem.h:166
unsigned int net_mtu
Definition totem.h:210
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totem.h:252
unsigned int token_retransmits_before_loss_const
Definition totem.h:190
unsigned int max_network_delay
Definition totem.h:216
unsigned int seqno_unchanged_const
Definition totem.h:204
unsigned int consensus_timeout
Definition totem.h:196
unsigned int threads
Definition totem.h:212
unsigned int send_join_timeout
Definition totem.h:194
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totem.h:256
unsigned int token_retransmit_timeout
Definition totem.h:186
unsigned int token_warning
Definition totem.h:184
unsigned int join_timeout
Definition totem.h:192
unsigned int token_hold_timeout
Definition totem.h:188
struct totem_ip_address boundto
Definition totem.h:84
uint8_t configured
Definition totem.h:89
int member_count
Definition totem.h:90
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
Definition totem.h:97
struct totem_ip_address mcast_addr
Definition totem.h:85
The totem_ip_address struct.
Definition coroapi.h:111
unsigned int nodeid
Definition coroapi.h:112
unsigned short family
Definition coroapi.h:113
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition totem.h:101
void(*) in log_level_security)
Definition totem.h:110
unsigned int nodeid
Definition totem.h:131
unsigned short magic
Definition totem.h:127
struct totem_ip_address mcast_address
Definition totemsrp.c:452
totemsrp_stats_t stats
Definition totemsrp.c:516
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:320
int consensus_list_entries
Definition totemsrp.c:300
int my_merge_detect_timeout_outstanding
Definition totemsrp.c:346
unsigned int my_last_seq
Definition totemsrp.c:496
qb_loop_timer_handle timer_heartbeat_timeout
Definition totemsrp.c:419
unsigned int my_token_seq
Definition totemsrp.c:396
qb_loop_timer_handle memb_timer_state_gather_join_timeout
Definition totemsrp.c:413
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:298
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
Definition totemsrp.c:415
uint64_t pause_timestamp
Definition totemsrp.c:512
uint32_t threaded_mode_enabled
Definition totemsrp.c:522
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:316
void * totemnet_context
Definition totemsrp.c:500
int my_leave_memb_entries
Definition totemsrp.c:338
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:308
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:312
int my_failed_list_entries
Definition totemsrp.c:326
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:310
unsigned int use_heartbeat
Definition totemsrp.c:504
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totemsrp.c:472
void(*) enum memb_stat memb_state)
Definition totemsrp.c:446
qb_loop_timer_handle memb_timer_state_commit_timeout
Definition totemsrp.c:417
struct cs_queue new_message_queue
Definition totemsrp.c:371
int orf_token_retransmit_size
Definition totemsrp.c:394
unsigned int my_high_seq_received
Definition totemsrp.c:354
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition totemsrp.c:454
uint32_t orf_token_discard
Definition totemsrp.c:518
struct qb_list_head token_callback_sent_listhead
Definition totemsrp.c:390
unsigned int last_released
Definition totemsrp.c:486
unsigned int set_aru
Definition totemsrp.c:488
int totemsrp_log_level_notice
Definition totemsrp.c:430
struct cs_queue new_message_queue_trans
Definition totemsrp.c:373
int totemsrp_log_level_trace
Definition totemsrp.c:434
char orf_token_retransmit[TOKEN_SIZE_MAX]
Definition totemsrp.c:392
unsigned int my_trc
Definition totemsrp.c:506
struct cs_queue retrans_message_queue
Definition totemsrp.c:375
struct memb_ring_id my_ring_id
Definition totemsrp.c:340
int totemsrp_log_level_error
Definition totemsrp.c:426
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
Definition totemsrp.c:469
unsigned int old_ring_state_high_seq_received
Definition totemsrp.c:494
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
Definition totemsrp.c:409
qb_loop_timer_handle timer_pause_timeout
Definition totemsrp.c:401
unsigned int my_high_ring_delivered
Definition totemsrp.c:364
qb_loop_timer_handle timer_orf_token_retransmit_timeout
Definition totemsrp.c:407
struct totem_config * totem_config
Definition totemsrp.c:502
int my_deliver_memb_entries
Definition totemsrp.c:334
void(* totemsrp_service_ready_fn)(void)
Definition totemsrp.c:467
int my_trans_memb_entries
Definition totemsrp.c:330
uint32_t originated_orf_token
Definition totemsrp.c:520
void * token_recv_event_handle
Definition totemsrp.c:528
struct sq recovery_sort_queue
Definition totemsrp.c:379
qb_loop_timer_handle timer_orf_token_timeout
Definition totemsrp.c:403
qb_loop_timer_handle timer_merge_detect_timeout
Definition totemsrp.c:411
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:322
void * token_sent_event_handle
Definition totemsrp.c:529
unsigned int my_high_delivered
Definition totemsrp.c:386
int totemsrp_log_level_security
Definition totemsrp.c:424
int totemsrp_log_level_warning
Definition totemsrp.c:428
struct memb_commit_token * commit_token
Definition totemsrp.c:514
char commit_token_storage[40000]
Definition totemsrp.c:530
struct memb_ring_id my_old_ring_id
Definition totemsrp.c:342
struct timeval tv_old
Definition totemsrp.c:498
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totemsrp.c:476
qb_loop_t * totemsrp_poll_handle
Definition totemsrp.c:450
unsigned int my_install_seq
Definition totemsrp.c:356
qb_loop_timer_handle timer_orf_token_warning
Definition totemsrp.c:405
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:314
struct srp_addr my_id
Definition totemsrp.c:304
unsigned int my_cbl
Definition totemsrp.c:510
struct qb_list_head token_callback_received_listhead
Definition totemsrp.c:388
unsigned int my_last_aru
Definition totemsrp.c:348
unsigned int my_aru
Definition totemsrp.c:384
uint32_t waiting_trans_ack
Definition totemsrp.c:524
void(* totemsrp_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
Definition totemsrp.c:438
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition totemsrp.c:460
struct sq regular_sort_queue
Definition totemsrp.c:377
unsigned long long token_ring_id_seq
Definition totemsrp.c:484
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:318
int totemsrp_log_level_debug
Definition totemsrp.c:432
unsigned int my_pbl
Definition totemsrp.c:508
struct totem_ip_address my_addrs[INTERFACE_MAX]
Definition totemsrp.c:306
uint64_t memb_join_tx
Definition totemstats.h:59
uint32_t continuous_gather
Definition totemstats.h:78
uint64_t recovery_entered
Definition totemstats.h:74
uint64_t rx_msg_dropped
Definition totemstats.h:77
uint64_t gather_entered
Definition totemstats.h:70
uint64_t memb_commit_token_rx
Definition totemstats.h:65
uint64_t mcast_retx
Definition totemstats.h:62
uint64_t mcast_tx
Definition totemstats.h:61
uint64_t memb_commit_token_tx
Definition totemstats.h:64
uint64_t operational_token_lost
Definition totemstats.h:69
uint64_t operational_entered
Definition totemstats.h:68
uint64_t gather_token_lost
Definition totemstats.h:71
uint64_t commit_token_lost
Definition totemstats.h:73
uint64_t token_hold_cancel_tx
Definition totemstats.h:66
uint64_t orf_token_rx
Definition totemstats.h:56
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
Definition totemstats.h:90
uint64_t recovery_token_lost
Definition totemstats.h:75
uint64_t commit_entered
Definition totemstats.h:72
uint64_t memb_merge_detect_rx
Definition totemstats.h:58
uint64_t memb_join_rx
Definition totemstats.h:60
uint64_t orf_token_tx
Definition totemstats.h:55
uint64_t memb_merge_detect_tx
Definition totemstats.h:57
uint64_t mcast_rx
Definition totemstats.h:63
uint64_t token_hold_cancel_rx
Definition totemstats.h:67
uint64_t consensus_timeouts
Definition totemstats.h:76
#define swab64(x)
The swab64 macro.
Definition swab.h:65
#define swab16(x)
The swab16 macro.
Definition swab.h:39
#define swab32(x)
The swab32 macro.
Definition swab.h:51
totem_event_type
Definition totem.h:292
#define TOTEM_MH_VERSION
Definition totem.h:124
#define FRAME_SIZE_MAX
Definition totem.h:52
cfg_message_crypto_reconfig_phase_t
Definition totem.h:154
#define TOTEM_NODE_STATUS_STRUCTURE_VERSION
Definition totem.h:266
#define TOTEM_MH_MAGIC
Definition totem.h:123
char type
Definition totem.h:2
int totemconfig_commit_new_params(struct totem_config *totem_config, icmap_map_t map)
const char * totemip_sa_print(const struct sockaddr *sa)
Definition totemip.c:234
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition totemip.c:123
int totemnet_initialize(qb_loop_t *loop_pt, void **net_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, int(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), int(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
Definition totemnet.c:317
int totemnet_iface_set(void *net_context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition totemnet.c:471
int totemnet_member_remove(void *net_context, const struct totem_ip_address *member, int ring_no)
Definition totemnet.c:553
void * totemnet_buffer_alloc(void *net_context)
Definition totemnet.c:367
int totemnet_token_send(void *net_context, const void *msg, unsigned int msg_len)
Definition totemnet.c:414
int totemnet_send_flush(void *net_context)
Definition totemnet.c:404
void totemnet_buffer_release(void *net_context, void *ptr)
Definition totemnet.c:375
int totemnet_mcast_flush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition totemnet.c:426
int totemnet_member_add(void *net_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
Definition totemnet.c:533
int totemnet_finalize(void *net_context)
Definition totemnet.c:306
int totemnet_crypto_set(void *net_context, const char *cipher_type, const char *hash_type)
Definition totemnet.c:292
int totemnet_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
Definition totemnet.c:497
int totemnet_processor_count_set(void *net_context, int processor_count)
Definition totemnet.c:383
int totemnet_token_target_set(void *net_context, unsigned int nodeid)
Definition totemnet.c:510
int totemnet_recv_flush(void *net_context)
Definition totemnet.c:394
int totemnet_iface_check(void *net_context)
Definition totemnet.c:452
int totemnet_mcast_noflush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition totemnet.c:439
int totemnet_recv_mcast_empty(void *net_context)
Definition totemnet.c:522
int totemnet_nodestatus_get(void *net_context, unsigned int nodeid, struct totem_node_status *node_status)
Definition totemnet.c:484
void totemnet_stats_clear(void *net_context)
Definition totemnet.c:619
int totemnet_reconfigure(void *net_context, struct totem_config *totem_config)
Definition totemnet.c:589
int totemnet_crypto_reconfigure_phase(void *net_context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
Definition totemnet.c:603
Totem Network interface - also does encryption/decryption.
int totemsrp_my_family_get(void *srp_context)
Definition totemsrp.c:1133
#define SEQNO_START_TOKEN
Definition totemsrp.c:122
int main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
Definition totemsrp.c:5137
unsigned long long ring_seq
Definition totemsrp.c:4
#define RETRANSMIT_ENTRIES_MAX
Definition totemsrp.c:100
unsigned int seq
Definition totemsrp.c:2
#define log_printf(level, format, args...)
Definition totemsrp.c:690
void totemsrp_force_gather(void *context)
Definition totemsrp.c:5279
int rtr_list_entries
Definition totemsrp.c:9
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
Definition totemsrp.c:5200
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
Definition totemsrp.c:818
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition totemsrp.c:3498
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
Definition totemsrp.c:97
void totemsrp_threaded_mode_enable(void *context)
Definition totemsrp.c:5235
struct rtr_item rtr_list[0]
Definition totemsrp.c:10
message_type
Definition totemsrp.c:146
@ MESSAGE_TYPE_MEMB_COMMIT_TOKEN
Definition totemsrp.c:151
@ MESSAGE_TYPE_TOKEN_HOLD_CANCEL
Definition totemsrp.c:152
@ MESSAGE_TYPE_ORF_TOKEN
Definition totemsrp.c:147
@ MESSAGE_TYPE_MEMB_JOIN
Definition totemsrp.c:150
@ MESSAGE_TYPE_MEMB_MERGE_DETECT
Definition totemsrp.c:149
@ MESSAGE_TYPE_MCAST
Definition totemsrp.c:148
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition totemsrp.c:5196
#define TOKEN_SIZE_MAX
Definition totemsrp.c:101
encapsulation_type
Definition totemsrp.c:155
@ MESSAGE_NOT_ENCAPSULATED
Definition totemsrp.c:157
@ MESSAGE_ENCAPSULATED
Definition totemsrp.c:156
unsigned int failed_list_entries
Definition totemsrp.c:3
struct message_handlers totemsrp_message_handlers
Definition totemsrp.c:678
int totemsrp_nodestatus_get(void *srp_context, unsigned int nodeid, struct totem_node_status *node_status)
Definition totemsrp.c:1041
#define LEAVE_DUMMY_NODEID
Definition totemsrp.c:102
#define QUEUE_RTR_ITEMS_SIZE_MAX
Definition totemsrp.c:96
int guarantee
Definition totemsrp.c:6
unsigned int aru
Definition totemsrp.c:3
gather_state_from
Definition totemsrp.c:542
@ TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED
Definition totemsrp.c:546
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE
Definition totemsrp.c:551
@ TOTEMSRP_GSFROM_FAILED_TO_RECEIVE
Definition totemsrp.c:549
@ TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT
Definition totemsrp.c:543
@ TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE
Definition totemsrp.c:552
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE
Definition totemsrp.c:545
@ TOTEMSRP_GSFROM_MERGE_DURING_JOIN
Definition totemsrp.c:554
@ TOTEMSRP_GSFROM_INTERFACE_CHANGE
Definition totemsrp.c:558
@ TOTEMSRP_GSFROM_GATHER_MISSING1
Definition totemsrp.c:544
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE
Definition totemsrp.c:547
@ TOTEMSRP_GSFROM_MAX
Definition totemsrp.c:559
@ TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE
Definition totemsrp.c:556
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE
Definition totemsrp.c:548
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE
Definition totemsrp.c:550
@ TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE
Definition totemsrp.c:555
@ TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE
Definition totemsrp.c:553
@ TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY
Definition totemsrp.c:557
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
Definition totemsrp.c:1108
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
Definition totemsrp.c:2568
void totemsrp_stats_clear(void *context, int flags)
Definition totemsrp.c:5269
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition totemsrp.c:5116
void totemsrp_finalize(void *srp_context)
Definition totemsrp.c:1026
struct memb_ring_id ring_id
Definition totemsrp.c:4
void totemsrp_trans_ack(void *context)
Definition totemsrp.c:5242
int totemsrp_crypto_reconfigure_phase(void *context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
Definition totemsrp.c:5260
unsigned int totemsrp_my_nodeid_get(void *srp_context)
Definition totemsrp.c:1122
int addr_entries
Definition totemsrp.c:5
unsigned int backlog
Definition totemsrp.c:6
#define SEQNO_START_MSG
Definition totemsrp.c:121
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
Definition totemsrp.c:2488
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
Definition totemsrp.c:3533
unsigned int received_flg
Definition totemsrp.c:3
struct message_item __attribute__
int main_deliver_fn(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
Definition totemsrp.c:5065
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
Definition totemsrp.c:5209
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition totemsrp.c:1070
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
Definition totemsrp.c:5251
unsigned int high_delivered
Definition totemsrp.c:2
struct srp_addr system_from
Definition totemsrp.c:1
unsigned int proc_list_entries
Definition totemsrp.c:2
const char * gather_state_from_desc[]
Definition totemsrp.c:562
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
Definition totemsrp.c:5222
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Definition totemsrp.c:2497
memb_state
Definition totemsrp.c:277
@ MEMB_STATE_GATHER
Definition totemsrp.c:279
@ MEMB_STATE_RECOVERY
Definition totemsrp.c:281
@ MEMB_STATE_COMMIT
Definition totemsrp.c:280
@ MEMB_STATE_OPERATIONAL
Definition totemsrp.c:278
Totem Single Ring Protocol.
#define TOTEMPG_STATS_CLEAR_TRANSPORT
Definition totemstats.h:116
#define TOTEM_TOKEN_STATS_MAX
Definition totemstats.h:89