/usr/include/asterisk/stasis.h is in asterisk-dev 1:13.18.3~dfsg-1ubuntu4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 | /*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2013, Digium, Inc.
*
* David M. Lee, II <dlee@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#ifndef _ASTERISK_STASIS_H
#define _ASTERISK_STASIS_H
/*! \file
*
* \brief Stasis Message Bus API. See \ref stasis "Stasis Message Bus API" for
* detailed documentation.
*
* \author David M. Lee, II <dlee@digium.com>
* \since 12
*
* \page stasis Stasis Message Bus API
*
* \par Intro
*
* The Stasis Message Bus is a loosely typed mechanism for distributing messages
* within Asterisk. It is designed to be:
* - Loosely coupled; new message types can be added in seperate modules.
* - Easy to use; publishing and subscribing are straightforward operations.
*
* There are three main concepts for using the Stasis Message Bus:
* - \ref stasis_message
* - \ref stasis_topic
* - \ref stasis_subscription
*
* \par stasis_message
*
* Central to the Stasis Message Bus is the \ref stasis_message, the messages
* that are sent on the bus. These messages have:
* - a type (as defined by a \ref stasis_message_type)
* - a value - a \c void pointer to an AO2 object
* - a timestamp when it was created
*
* Once a \ref stasis_message has been created, it is immutable and cannot
* change. The same goes for the value of the message (although this cannot be
* enforced in code). Messages themselves are reference-counted, AO2 objects,
* along with their values. By being both reference counted and immutable,
* messages can be shared throughout the system without any concerns for
* threading.
*
* The type of a message is defined by an instance of \ref stasis_message_type,
* which can be created by calling stasis_message_type_create(). Message types
* are named, which is useful in debugging. It is recommended that the string
* name for a message type match the name of the struct that's stored in the
* message. For example, name for \ref stasis_cache_update's message type is \c
* "stasis_cache_update".
*
* \par stasis_topic
*
* A \ref stasis_topic is an object to which \ref stasis_subscriber's may be
* subscribed, and \ref stasis_message's may be published. Any message published
* to the topic is dispatched to all of its subscribers. The topic itself may be
* named, which is useful in debugging.
*
* Topics themselves are reference counted objects. Since topics are referred to
* by their subscibers, they will not be freed until all of their subscribers
* have unsubscribed. Topics are also thread safe, so no worries about
* publishing/subscribing/unsubscribing to a topic concurrently from multiple
* threads. It's also designed to handle the case of unsubscribing from a topic
* from within the subscription handler.
*
* \par Forwarding
*
* There is one special case of topics that's worth noting: forwarding
* messages. It's a fairly common use case to want to forward all the messages
* published on one topic to another one (for example, an aggregator topic that
* publishes all the events from a set of other topics). This can be
* accomplished easily using stasis_forward_all(). This sets up the forwarding
* between the two topics, and returns a \ref stasis_subscription, which can be
* unsubscribed to stop the forwarding.
*
* \par Caching
*
* Another common use case is to want to cache certain messages that are
* published on the bus. Usually these events are snapshots of the current state
* in the system, and it's desirable to query that state from the cache without
* locking the original object. It's also desirable for subscribers of the
* caching topic to receive messages that have both the old cache value and the
* new value being put into the cache. For this, we have stasis_cache_create()
* and stasis_caching_topic_create(), providing them with the topic which
* publishes the messages that you wish to cache, and a function that can
* identify cacheable messages.
*
* The \ref stasis_cache is designed so that it may be shared amongst several
* \ref stasis_caching_topic objects. This allows you to have individual caching
* topics per-object (i.e. so you can subscribe to updates for a single object),
* and still have a single cache to query for the state of all objects. While a
* cache may be shared amongst different message types, such a usage is probably
* not a good idea.
*
* The \ref stasis_cache can only be written to by \ref stasis_caching_topics.
* It's a thread safe container, so freely use the stasis_cache_get() and
* stasis_cache_dump() to query the cache.
*
* The \ref stasis_caching_topic discards non-cacheable messages. A cacheable
* message is wrapped in a \ref stasis_cache_update message which provides the
* old snapshot (or \c NULL if this is a new cache entry), and the new snapshot
* (or \c NULL if the entry was removed from the cache). A
* stasis_cache_clear_create() message must be sent to the topic in order to
* remove entries from the cache.
*
* In order to unsubscribe a \ref stasis_caching_topic from the upstream topic,
* call stasis_caching_unsubscribe(). Due to cyclic references, the \ref
* stasis_caching_topic will not be freed until after it has been unsubscribed,
* and all other ao2_ref()'s have been cleaned up.
*
* The \ref stasis_cache object is a normal AO2 managed object, which can be
* release with ao2_cleanup().
*
* \par stasis_subscriber
*
* Any topic may be subscribed to by simply providing stasis_subscribe() the
* \ref stasis_topic to subscribe to, a handler function and \c void pointer to
* data that is passed back to the handler. Invocations on the subscription's
* handler are serialized, but different invocations may occur on different
* threads (this usually isn't important unless you use thread locals or
* something similar).
*
* In order to stop receiving messages, call stasis_unsubscribe() with your \ref
* stasis_subscription. Due to cyclic references, the \ref
* stasis_subscription will not be freed until after it has been unsubscribed,
* and all other ao2_ref()'s have been cleaned up.
*
* \par Shutdown
*
* Subscriptions have two options for unsubscribing, depending upon the context
* in which you need to unsubscribe.
*
* If your subscription is owned by a module, and you must unsubscribe from the
* module_unload() function, then you'll want to use the
* stasis_unsubscribe_and_join() function. This will block until the final
* message has been received on the subscription. Otherwise, there's the danger
* of invoking the callback function after it has been unloaded.
*
* If your subscription is owned by an object, then your object should have an
* explicit shutdown() function, which calls stasis_unsubscribe(). In your
* subscription handler, when the stasis_subscription_final_message() has been
* received, decrement the refcount on your object. In your object's destructor,
* you may assert that stasis_subscription_is_done() to validate that the
* subscription's callback will no longer be invoked.
*
* \b Note: You may be tempted to simply call stasis_unsubscribe_and_join() from
* an object's destructor. While code that does this may work most of the time,
* it's got one big downside. There's a general assumption that object
* destruction is non-blocking. If you block the destruction waiting for the
* subscription to complete, there's the danger that the subscription may
* process a message which will bump the refcount up by one. Then it does
* whatever it does, decrements the refcount, which then proceeds to re-destroy
* the object. Now you've got hard to reproduce bugs that only show up under
* certain loads.
*/
#include "asterisk/json.h"
#include "asterisk/manager.h"
#include "asterisk/utils.h"
#include "asterisk/event.h"
/*! @{ */
/*!
* \brief Metadata about a \ref stasis_message.
* \since 12
*/
struct stasis_message_type;
/*!
* \brief Opaque type for a Stasis message.
* \since 12
*/
struct stasis_message;
/*!
* \brief Opaque type for a Stasis subscription.
* \since 12
*/
struct stasis_subscription;
/*!
* \brief Structure containing callbacks for Stasis message sanitization
*
* \note If either callback is implemented, both should be implemented since
* not all callers may have access to the full snapshot.
*/
struct stasis_message_sanitizer {
/*!
* \brief Callback which determines whether a channel should be sanitized from
* a message based on the channel's unique ID
*
* \param channel_id The unique ID of the channel
*
* \retval non-zero if the channel should be left out of the message
* \retval zero if the channel should remain in the message
*/
int (*channel_id)(const char *channel_id);
/*!
* \brief Callback which determines whether a channel should be sanitized from
* a message based on the channel's snapshot
*
* \param snapshot A snapshot generated from the channel
*
* \retval non-zero if the channel should be left out of the message
* \retval zero if the channel should remain in the message
*/
int (*channel_snapshot)(const struct ast_channel_snapshot *snapshot);
/*!
* \brief Callback which determines whether a channel should be sanitized from
* a message based on the channel
*
* \param chan The channel to be checked
*
* \retval non-zero if the channel should be left out of the message
* \retval zero if the channel should remain in the message
*/
int (*channel)(const struct ast_channel *chan);
};
/*!
* \brief Virtual table providing methods for messages.
* \since 12
*/
struct stasis_message_vtable {
/*!
* \brief Build the JSON representation of the message.
*
* May be \c NULL, or may return \c NULL, to indicate no representation.
* The returned object should be ast_json_unref()'ed.
*
* \param message Message to convert to JSON string.
* \param sanitize Snapshot sanitization callback.
*
* \return Newly allocated JSON message.
* \return \c NULL on error.
* \return \c NULL if JSON format is not supported.
*/
struct ast_json *(*to_json)(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize);
/*!
* \brief Build the AMI representation of the message.
*
* May be \c NULL, or may return \c NULL, to indicate no representation.
* The returned object should be ao2_cleanup()'ed.
*
* \param message Message to convert to AMI string.
* \return Newly allocated \ref ast_manager_event_blob.
* \return \c NULL on error.
* \return \c NULL if AMI format is not supported.
*/
struct ast_manager_event_blob *(*to_ami)(
struct stasis_message *message);
/*!
* \since 12.3.0
* \brief Build the \ref ast_event representation of the message.
*
* May be \c NULL, or may return \c NULL, to indicate no representation.
* The returned object should be free'd.
*
* \param message Message to convert to an \ref ast_event.
* \return Newly allocated \ref ast_event.
* \return \c NULL on error.
* \return \c NULL if AMI format is not supported.
*/
struct ast_event *(*to_event)(
struct stasis_message *message);
};
/*!
* \brief Return code for Stasis message type creation attempts
*/
enum stasis_message_type_result {
STASIS_MESSAGE_TYPE_ERROR = -1, /*!< Message type was not created due to allocation failure */
STASIS_MESSAGE_TYPE_SUCCESS, /*!< Message type was created successfully */
STASIS_MESSAGE_TYPE_DECLINED, /*!< Message type was not created due to configuration */
};
/*!
* \brief Create a new message type.
*
* \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done
* with it.
*
* \param name Name of the new type.
* \param vtable Virtual table of message methods. May be \c NULL.
* \param[out] result The location where the new message type will be placed
*
* \note Stasis message type creation may be declined if the message type is disabled
*
* \returns A stasis_message_type_result enum
* \since 12
*/
enum stasis_message_type_result stasis_message_type_create(const char *name,
struct stasis_message_vtable *vtable, struct stasis_message_type **result);
/*!
* \brief Gets the name of a given message type
* \param type The type to get.
* \return Name of the type.
* \return \c NULL if \a type is \c NULL.
* \since 12
*/
const char *stasis_message_type_name(const struct stasis_message_type *type);
/*!
* \brief Check whether a message type is declined
*
* \param name The name of the message type to check
*
* \retval zero The message type is not declined
* \retval non-zero The message type is declined
*/
int stasis_message_type_declined(const char *name);
/*!
* \brief Create a new message.
*
* This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
* with it. Messages are also immutable, and must not be modified after they
* are initialized. Especially the \a data in the message.
*
* \param type Type of the message
* \param data Immutable data that is the actual contents of the message
*
* \return New message
* \return \c NULL on error
*
* \since 12
*/
struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data);
/*!
* \brief Create a new message for an entity.
*
* This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
* with it. Messages are also immutable, and must not be modified after they
* are initialized. Especially the \a data in the message.
*
* \param type Type of the message
* \param data Immutable data that is the actual contents of the message
* \param eid What entity originated this message. (NULL for aggregate)
*
* \note An aggregate message is a combined representation of the local
* and remote entities publishing the message data. e.g., An aggregate
* device state represents the combined device state from the local and
* any remote entities publishing state for a device. e.g., An aggregate
* MWI message is the old/new MWI counts accumulated from the local and
* any remote entities publishing to a mailbox.
*
* \retval New message
* \retval \c NULL on error
*
* \since 12.2.0
*/
struct stasis_message *stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid);
/*!
* \brief Get the entity id for a \ref stasis_message.
* \since 12.2.0
*
* \param msg Message to get eid.
*
* \retval Entity id of \a msg
* \retval \c NULL if \a msg is an aggregate or \a msg is \c NULL.
*/
const struct ast_eid *stasis_message_eid(const struct stasis_message *msg);
/*!
* \brief Get the message type for a \ref stasis_message.
* \param msg Message to type
* \return Type of \a msg
* \return \c NULL if \a msg is \c NULL.
* \since 12
*/
struct stasis_message_type *stasis_message_type(const struct stasis_message *msg);
/*!
* \brief Get the data contained in a message.
* \param msg Message.
* \return Immutable data pointer
* \return \c NULL if msg is \c NULL.
* \since 12
*/
void *stasis_message_data(const struct stasis_message *msg);
/*!
* \brief Get the time when a message was created.
* \param msg Message.
* \return Pointer to the \a timeval when the message was created.
* \return \c NULL if msg is \c NULL.
* \since 12
*/
const struct timeval *stasis_message_timestamp(const struct stasis_message *msg);
/*!
* \brief Build the JSON representation of the message.
*
* May return \c NULL, to indicate no representation. The returned object should
* be ast_json_unref()'ed.
*
* \param msg Message to convert to JSON string.
* \param sanitize Snapshot sanitization callback.
*
* \return Newly allocated string with JSON message.
* \return \c NULL on error.
* \return \c NULL if JSON format is not supported.
*/
struct ast_json *stasis_message_to_json(struct stasis_message *msg, struct stasis_message_sanitizer *sanitize);
/*!
* \brief Build the AMI representation of the message.
*
* May return \c NULL, to indicate no representation. The returned object should
* be ao2_cleanup()'ed.
*
* \param msg Message to convert to AMI.
* \return \c NULL on error.
* \return \c NULL if AMI format is not supported.
*/
struct ast_manager_event_blob *stasis_message_to_ami(struct stasis_message *msg);
/*!
* \brief Determine if the given message can be converted to AMI.
*
* \param msg Message to see if can be converted to AMI.
*
* \retval 0 Cannot be converted
* \retval non-zero Can be converted
*/
int stasis_message_can_be_ami(struct stasis_message *msg);
/*!
* \brief Build the \ref AstGenericEvents representation of the message.
*
* May return \c NULL, to indicate no representation. The returned object should
* be disposed of via \ref ast_event_destroy.
*
* \param msg Message to convert to AMI.
* \return \c NULL on error.
* \return \c NULL if AMI format is not supported.
*/
struct ast_event *stasis_message_to_event(struct stasis_message *msg);
/*! @} */
/*! @{ */
/*!
* \brief A topic to which messages may be posted, and subscribers, well, subscribe
* \since 12
*/
struct stasis_topic;
/*!
* \brief Create a new topic.
* \param name Name of the new topic.
* \return New topic instance.
* \return \c NULL on error.
* \since 12
*/
struct stasis_topic *stasis_topic_create(const char *name);
/*!
* \brief Return the name of a topic.
* \param topic Topic.
* \return Name of the topic.
* \return \c NULL if topic is \c NULL.
* \since 12
*/
const char *stasis_topic_name(const struct stasis_topic *topic);
/*!
* \brief Publish a message to a topic's subscribers.
* \param topic Topic.
* \param message Message to publish.
*
* This call is asynchronous and will return immediately upon queueing
* the message for delivery to the topic's subscribers.
*
* \since 12
*/
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
/*!
* \brief Publish a message to a topic's subscribers, synchronizing
* on the specified subscriber
* \param sub Subscription to synchronize on.
* \param message Message to publish.
*
* The caller of stasis_publish_sync will block until the specified
* subscriber completes handling of the message.
*
* All other subscribers to the topic the \ref stasis_subpscription
* is subscribed to are also delivered the message; this delivery however
* happens asynchronously.
*
* \since 12.1.0
*/
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message);
/*! @} */
/*! @{ */
/*!
* \brief Callback function type for Stasis subscriptions.
* \param data Data field provided with subscription.
* \param message Published message.
* \since 12
*/
typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message);
/*!
* \brief Stasis subscription callback function that does nothing.
*
* \note This callback should be used for events are not directly processed, but need
* to be generated so data can be retrieved from cache later. Subscriptions with this
* callback can be released with \ref stasis_unsubscribe, even during module unload.
*
* \since 13.5
*/
void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message);
/*!
* \brief Create a subscription.
*
* In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
* up this reference), the subscription must be explicitly unsubscribed from its
* topic using stasis_unsubscribe().
*
* The invocations of the callback are serialized, but may not always occur on
* the same thread. The invocation order of different subscriptions is
* unspecified.
*
* \param topic Topic to subscribe to.
* \param callback Callback function for subscription messages.
* \param data Data to be passed to the callback, in addition to the message.
* \return New \ref stasis_subscription object.
* \return \c NULL on error.
* \since 12
*/
struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
stasis_subscription_cb callback, void *data);
/*!
* \brief Create a subscription whose callbacks occur on a thread pool
*
* In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
* up this reference), the subscription must be explicitly unsubscribed from its
* topic using stasis_unsubscribe().
*
* The invocations of the callback are serialized, but will almost certainly not
* always happen on the same thread. The invocation order of different subscriptions
* is unspecified.
*
* Unlike \ref stasis_subscribe, this function will explicitly use a threadpool to
* dispatch items to its \c callback. This form of subscription should be used
* when many subscriptions may be made to the specified \c topic.
*
* \param topic Topic to subscribe to.
* \param callback Callback function for subscription messages.
* \param data Data to be passed to the callback, in addition to the message.
* \return New \ref stasis_subscription object.
* \return \c NULL on error.
* \since 12.8.0
*/
struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,
stasis_subscription_cb callback, void *data);
/*!
* \brief Cancel a subscription.
*
* Note that in an asynchronous system, there may still be messages queued or
* in transit to the subscription's callback. These will still be delivered.
* There will be a final 'SubscriptionCancelled' message, indicating the
* delivery of the final message.
*
* \param subscription Subscription to cancel.
* \return \c NULL for convenience
* \since 12
*/
struct stasis_subscription *stasis_unsubscribe(
struct stasis_subscription *subscription);
/*!
* \brief Set the high and low alert water marks of the stasis subscription.
* \since 13.10.0
*
* \param subscription Pointer to a stasis subscription
* \param low_water New queue low water mark. (-1 to set as 90% of high_water)
* \param high_water New queue high water mark.
*
* \retval 0 on success.
* \retval -1 on error (water marks not changed).
*/
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
long low_water, long high_water);
/*!
* \brief Block until the last message is processed on a subscription.
*
* This function will not return until the \a subscription's callback for the
* stasis_subscription_final_message() completes. This allows cleanup routines
* to run before unblocking the joining thread.
*
* \param subscription Subscription to block on.
* \since 12
*/
void stasis_subscription_join(struct stasis_subscription *subscription);
/*!
* \brief Returns whether \a subscription has received its final message.
*
* Note that a subscription is considered done even while the
* stasis_subscription_final_message() is being processed. This allows cleanup
* routines to check the status of the subscription.
*
* \param subscription Subscription.
* \return True (non-zero) if stasis_subscription_final_message() has been
* received.
* \return False (zero) if waiting for the end.
*/
int stasis_subscription_is_done(struct stasis_subscription *subscription);
/*!
* \brief Cancel a subscription, blocking until the last message is processed.
*
* While normally it's recommended to stasis_unsubscribe() and wait for
* stasis_subscription_final_message(), there are times (like during a module
* unload) where you have to wait for the final message (otherwise you'll call
* a function in a shared module that no longer exists).
*
* \param subscription Subscription to cancel.
* \return \c NULL for convenience
* \since 12
*/
struct stasis_subscription *stasis_unsubscribe_and_join(
struct stasis_subscription *subscription);
struct stasis_forward;
/*!
* \brief Create a subscription which forwards all messages from one topic to
* another.
*
* Note that the \a topic parameter of the invoked callback will the be the
* \a topic the message was sent to, not the topic the subscriber subscribed to.
*
* \param from_topic Topic to forward.
* \param to_topic Destination topic of forwarded messages.
* \return New forwarding subscription.
* \return \c NULL on error.
* \since 12
*/
struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
struct stasis_topic *to_topic);
struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward);
/*!
* \brief Get the unique ID for the subscription.
*
* \param sub Subscription for which to get the unique ID.
* \return Unique ID for the subscription.
* \since 12
*/
const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub);
/*!
* \brief Returns whether a subscription is currently subscribed.
*
* Note that there may still be messages queued up to be dispatched to this
* subscription, but the stasis_subscription_final_message() has been enqueued.
*
* \param sub Subscription to check
* \return False (zero) if subscription is not subscribed.
* \return True (non-zero) if still subscribed.
*/
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub);
/*!
* \brief Determine whether a message is the final message to be received on a subscription.
*
* \param sub Subscription on which the message was received.
* \param msg Message to check.
* \return zero if the provided message is not the final message.
* \return non-zero if the provided message is the final message.
* \since 12
*/
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg);
/*! \addtogroup StasisTopicsAndMessages
* @{
*/
/*!
* \brief Holds details about changes to subscriptions for the specified topic
* \since 12
*/
struct stasis_subscription_change {
AST_DECLARE_STRING_FIELDS(
AST_STRING_FIELD(uniqueid); /*!< The unique ID associated with this subscription */
AST_STRING_FIELD(description); /*!< The description of the change to the subscription associated with the uniqueid */
);
struct stasis_topic *topic; /*!< The topic the subscription is/was subscribing to */
};
/*!
* \brief Gets the message type for subscription change notices
* \return The stasis_message_type for subscription change notices
* \since 12
*/
struct stasis_message_type *stasis_subscription_change_type(void);
/*! @} */
/*! @{ */
/*!
* \brief Pool for topic aggregation
*/
struct stasis_topic_pool;
/*!
* \brief Create a topic pool that routes messages from dynamically generated topics to the given topic
* \param pooled_topic Topic to which messages will be routed
* \return the new stasis_topic_pool
* \return \c NULL on failure
*/
struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic);
/*!
* \brief Find or create a topic in the pool
* \param pool Pool for which to get the topic
* \param topic_name Name of the topic to get
* \return The already stored or newly allocated topic
* \return \c NULL if the topic was not found and could not be allocated
*/
struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name);
/*! @} */
/*! \addtogroup StasisTopicsAndMessages
* @{
*/
/*!
* \brief Message type for cache update messages.
* \return Message type for cache update messages.
* \since 12
*/
struct stasis_message_type *stasis_cache_update_type(void);
/*!
* \brief Cache update message
* \since 12
*/
struct stasis_cache_update {
/*! \brief Convenience reference to snapshot type */
struct stasis_message_type *type;
/*! \brief Old value from the cache */
struct stasis_message *old_snapshot;
/*! \brief New value */
struct stasis_message *new_snapshot;
};
/*!
* \brief Message type for clearing a message from a stasis cache.
* \since 12
*/
struct stasis_message_type *stasis_cache_clear_type(void);
/*! @} */
/*! @{ */
/*!
* \brief A message cache, for use with \ref stasis_caching_topic.
* \since 12
*/
struct stasis_cache;
/*! Cache entry used for calculating the aggregate snapshot. */
struct stasis_cache_entry;
/*!
* \brief A topic wrapper, which caches certain messages.
* \since 12
*/
struct stasis_caching_topic;
/*!
* \brief Callback extract a unique identity from a snapshot message.
*
* This identity is unique to the underlying object of the snapshot, such as the
* UniqueId field of a channel.
*
* \param message Message to extract id from.
* \return String representing the snapshot's id.
* \return \c NULL if the message_type of the message isn't a handled snapshot.
* \since 12
*/
typedef const char *(*snapshot_get_id)(struct stasis_message *message);
/*!
* \brief Callback to calculate the aggregate cache entry.
* \since 12.2.0
*
* \param entry Cache entry to calculate a new aggregate snapshot.
* \param new_snapshot The shapshot that is being updated.
*
* \note Return a ref bumped pointer from stasis_cache_entry_get_aggregate()
* if a new aggregate could not be calculated because of error.
*
* \note An aggregate message is a combined representation of the local
* and remote entities publishing the message data. e.g., An aggregate
* device state represents the combined device state from the local and
* any remote entities publishing state for a device. e.g., An aggregate
* MWI message is the old/new MWI counts accumulated from the local and
* any remote entities publishing to a mailbox.
*
* \return New aggregate-snapshot calculated on success.
* Caller has a reference on return.
*/
typedef struct stasis_message *(*cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot);
/*!
* \brief Callback to publish the aggregate cache entry message.
* \since 12.2.0
*
* \details
* Once an aggregate message is calculated. This callback publishes the
* message so subscribers will know the new value of an aggregated state.
*
* \param topic The aggregate message may be published to this topic.
* It is the topic to which the cache itself is subscribed.
* \param aggregate The aggregate shapshot message to publish.
*
* \note It is up to the function to determine if there is a better topic
* the aggregate message should be published over.
*
* \note An aggregate message is a combined representation of the local
* and remote entities publishing the message data. e.g., An aggregate
* device state represents the combined device state from the local and
* any remote entities publishing state for a device. e.g., An aggregate
* MWI message is the old/new MWI counts accumulated from the local and
* any remote entities publishing to a mailbox.
*
* \return Nothing
*/
typedef void (*cache_aggregate_publish_fn)(struct stasis_topic *topic, struct stasis_message *aggregate);
/*!
* \brief Get the aggregate cache entry snapshot.
* \since 12.2.0
*
* \param entry Cache entry to get the aggregate snapshot.
*
* \note A reference is not given to the returned pointer so don't unref it.
*
* \note An aggregate message is a combined representation of the local
* and remote entities publishing the message data. e.g., An aggregate
* device state represents the combined device state from the local and
* any remote entities publishing state for a device. e.g., An aggregate
* MWI message is the old/new MWI counts accumulated from the local and
* any remote entities publishing to a mailbox.
*
* \retval Aggregate-snapshot in cache.
* \retval NULL if not present.
*/
struct stasis_message *stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry);
/*!
* \brief Get the local entity's cache entry snapshot.
* \since 12.2.0
*
* \param entry Cache entry to get the local entity's snapshot.
*
* \note A reference is not given to the returned pointer so don't unref it.
*
* \retval Internal-snapshot in cache.
* \retval NULL if not present.
*/
struct stasis_message *stasis_cache_entry_get_local(struct stasis_cache_entry *entry);
/*!
* \brief Get a remote entity's cache entry snapshot by index.
* \since 12.2.0
*
* \param entry Cache entry to get a remote entity's snapshot.
* \param idx Which remote entity's snapshot to get.
*
* \note A reference is not given to the returned pointer so don't unref it.
*
* \retval Remote-entity-snapshot in cache.
* \retval NULL if not present.
*/
struct stasis_message *stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx);
/*!
* \brief Create a cache.
*
* This is the backend store for a \ref stasis_caching_topic. The cache is
* thread safe, allowing concurrent reads and writes.
*
* The returned object is AO2 managed, so ao2_cleanup() when you're done.
*
* \param id_fn Callback to extract the id from a snapshot message.
*
* \retval New cache indexed by \a id_fn.
* \retval \c NULL on error
*
* \since 12
*/
struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn);
/*!
* \brief Create a cache.
*
* This is the backend store for a \ref stasis_caching_topic. The cache is
* thread safe, allowing concurrent reads and writes.
*
* The returned object is AO2 managed, so ao2_cleanup() when you're done.
*
* \param id_fn Callback to extract the id from a snapshot message.
* \param aggregate_calc_fn Callback to calculate the aggregate cache entry.
* \param aggregate_publish_fn Callback to publish the aggregate cache entry.
*
* \note An aggregate message is a combined representation of the local
* and remote entities publishing the message data. e.g., An aggregate
* device state represents the combined device state from the local and
* any remote entities publishing state for a device. e.g., An aggregate
* MWI message is the old/new MWI counts accumulated from the local and
* any remote entities publishing to a mailbox.
*
* \retval New cache indexed by \a id_fn.
* \retval \c NULL on error
*
* \since 12.2.0
*/
struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn);
/*!
* \brief Create a topic which monitors and caches messages from another topic.
*
* The idea is that some topics publish 'snapshots' of some other object's state
* that should be cached. When these snapshot messages are received, the cache
* is updated, and a stasis_cache_update() message is forwarded, which has both
* the original snapshot message and the new message.
*
* The returned object is AO2 managed, so ao2_cleanup() when done with it.
*
* \param original_topic Topic publishing snapshot messages.
* \param cache Backend cache in which to keep snapshots.
* \return New topic which changes snapshot messages to stasis_cache_update()
* messages, and forwards all other messages from the original topic.
* \return \c NULL on error
* \since 12
*/
struct stasis_caching_topic *stasis_caching_topic_create(
struct stasis_topic *original_topic, struct stasis_cache *cache);
/*!
* \brief Unsubscribes a caching topic from its upstream topic.
*
* This function returns immediately, so be sure to cleanup when
* stasis_subscription_final_message() is received.
*
* \param caching_topic Caching topic to unsubscribe
* \return \c NULL for convenience
* \since 12
*/
struct stasis_caching_topic *stasis_caching_unsubscribe(
struct stasis_caching_topic *caching_topic);
/*!
* \brief Unsubscribes a caching topic from its upstream topic, blocking until
* all messages have been forwarded.
*
* See stasis_unsubscriben_and_join() for more info on when to use this as
* opposed to stasis_caching_unsubscribe().
*
* \param caching_topic Caching topic to unsubscribe
* \return \c NULL for convenience
* \since 12
*/
struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(
struct stasis_caching_topic *caching_topic);
/*!
* \brief Returns the topic of cached events from a caching topics.
* \param caching_topic The caching topic.
* \return The topic that publishes cache update events, along with passthrough
* events from the underlying topic.
* \return \c NULL if \a caching_topic is \c NULL.
* \since 12
*/
struct stasis_topic *stasis_caching_get_topic(
struct stasis_caching_topic *caching_topic);
/*!
* \brief A message which instructs the caching topic to remove an entry from
* its cache.
*
* \param message Message representative of the cache entry that should be
* cleared. This will become the data held in the
* stasis_cache_clear message.
*
* \return Message which, when sent to a \ref stasis_caching_topic, will clear
* the item from the cache.
* \return \c NULL on error.
* \since 12
*/
struct stasis_message *stasis_cache_clear_create(struct stasis_message *message);
/*!
* \brief Retrieve an item from the cache for the ast_eid_default entity.
*
* The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
*
* \param cache The cache to query.
* \param type Type of message to retrieve.
* \param id Identity of the snapshot to retrieve.
*
* \retval Message from the cache.
* \retval \c NULL if message is not found.
*
* \since 12
*/
struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
/*!
* \brief Retrieve an item from the cache for a specific entity.
*
* The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
*
* \param cache The cache to query.
* \param type Type of message to retrieve.
* \param id Identity of the snapshot to retrieve.
* \param eid Specific entity id to retrieve. NULL for aggregate.
*
* \note An aggregate message is a combined representation of the local
* and remote entities publishing the message data. e.g., An aggregate
* device state represents the combined device state from the local and
* any remote entities publishing state for a device. e.g., An aggregate
* MWI message is the old/new MWI counts accumulated from the local and
* any remote entities publishing to a mailbox.
*
* \retval Message from the cache.
* \retval \c NULL if message is not found.
*
* \since 12.2.0
*/
struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid);
/*!
* \brief Retrieve all matching entity items from the cache.
* \since 12.2.0
*
* \param cache The cache to query.
* \param type Type of message to retrieve.
* \param id Identity of the snapshot to retrieve.
*
* \retval Container of matching items found.
* \retval \c NULL if error.
*/
struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
/*!
* \brief Dump cached items to a subscription for the ast_eid_default entity.
*
* \param cache The cache to query.
* \param type Type of message to dump (any type if \c NULL).
*
* \retval ao2_container containing all matches (must be unreffed by caller)
* \retval \c NULL on allocation error
*
* \since 12
*/
struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type);
/*!
* \brief Dump cached items to a subscription for a specific entity.
* \since 12.2.0
*
* \param cache The cache to query.
* \param type Type of message to dump (any type if \c NULL).
* \param eid Specific entity id to retrieve. NULL for aggregate.
*
* \retval ao2_container containing all matches (must be unreffed by caller)
* \retval \c NULL on allocation error
*/
struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid);
/*!
* \brief Dump all entity items from the cache to a subscription.
* \since 12.2.0
*
* \param cache The cache to query.
* \param type Type of message to dump (any type if \c NULL).
*
* \retval ao2_container containing all matches (must be unreffed by caller)
* \retval \c NULL on allocation error
*/
struct ao2_container *stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type);
/*!
* \brief Object type code for multi user object snapshots
*/
enum stasis_user_multi_object_snapshot_type {
STASIS_UMOS_CHANNEL = 0, /*!< Channel Snapshots */
STASIS_UMOS_BRIDGE, /*!< Bridge Snapshots */
STASIS_UMOS_ENDPOINT, /*!< Endpoint Snapshots */
};
/*! \brief Number of snapshot types */
#define STASIS_UMOS_MAX (STASIS_UMOS_ENDPOINT + 1)
/*!
* \brief Message type for custom user defined events with multi object blobs
* \return The stasis_message_type for user event
* \since 12.3.0
*/
struct stasis_message_type *ast_multi_user_event_type(void);
/*!
* \brief Create a stasis multi object blob
* \since 12.3.0
*
* \details
* Multi object blob can store a combination of arbitrary json values
* (the blob) and also snapshots of various other system objects (such
* as channels, bridges, etc) for delivery through a stasis message.
* The multi object blob is first created, then optionally objects
* are added to it, before being attached to a message and delivered
* to stasis topic.
*
* \param blob Json blob
*
* \note When used for an ast_multi_user_event_type message, the
* json blob should contain at minimum {eventname: name}.
*
* \retval ast_multi_object_blob* if succeeded
* \retval NULL if creation failed
*/
struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob);
/*!
* \brief Add an object to a multi object blob previously created
* \since 12.3.0
*
* \param multi The multi object blob previously created
* \param type Type code for the object such as channel, bridge, etc.
* \param object Snapshot object of the type supplied to typename
*
* \return Nothing
*/
void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object);
/*!
* \brief Create and publish a stasis message blob on a channel with it's snapshot
* \since 12.3.0
*
* \details
* For compatibility with app_userevent, this creates a multi object
* blob message, attaches the channel snapshot to it, and publishes it
* to the channel's topic.
*
* \param chan The channel to snapshot and publish event to
* \param type The message type
* \param blob A json blob to publish with the snapshot
*
* \return Nothing
*/
void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob);
/*! @} */
/*! @{ */
/*!
* \internal
* \brief Log a message about invalid attempt to access a type.
*/
void stasis_log_bad_type_access(const char *name);
/*!
* \brief Boiler-plate messaging macro for defining public message types.
*
* \code
* STASIS_MESSAGE_TYPE_DEFN(ast_foo_type,
* .to_ami = foo_to_ami,
* .to_json = foo_to_json,
* .to_event = foo_to_event,
* );
* \endcode
*
* \param name Name of message type.
* \param ... Virtual table methods for messages of this type.
* \since 12
*/
#define STASIS_MESSAGE_TYPE_DEFN(name, ...) \
static struct stasis_message_vtable _priv_ ## name ## _v = { \
__VA_ARGS__ \
}; \
static struct stasis_message_type *_priv_ ## name; \
struct stasis_message_type *name(void) { \
if (_priv_ ## name == NULL) { \
stasis_log_bad_type_access(#name); \
} \
return _priv_ ## name; \
}
/*!
* \brief Boiler-plate messaging macro for defining local message types.
*
* \code
* STASIS_MESSAGE_TYPE_DEFN_LOCAL(ast_foo_type,
* .to_ami = foo_to_ami,
* .to_json = foo_to_json,
* .to_event = foo_to_event,
* );
* \endcode
*
* \param name Name of message type.
* \param ... Virtual table methods for messages of this type.
* \since 12
*/
#define STASIS_MESSAGE_TYPE_DEFN_LOCAL(name, ...) \
static struct stasis_message_vtable _priv_ ## name ## _v = { \
__VA_ARGS__ \
}; \
static struct stasis_message_type *_priv_ ## name; \
static struct stasis_message_type *name(void) { \
if (_priv_ ## name == NULL) { \
stasis_log_bad_type_access(#name); \
} \
return _priv_ ## name; \
}
/*!
* \brief Boiler-plate messaging macro for initializing message types.
*
* \code
* if (STASIS_MESSAGE_TYPE_INIT(ast_foo_type) != 0) {
* return -1;
* }
* \endcode
*
* \param name Name of message type.
* \return 0 if initialization is successful.
* \return Non-zero on failure.
* \since 12
*/
#define STASIS_MESSAGE_TYPE_INIT(name) \
({ \
ast_assert(_priv_ ## name == NULL); \
stasis_message_type_create(#name, \
&_priv_ ## name ## _v, &_priv_ ## name) == STASIS_MESSAGE_TYPE_ERROR ? 1 : 0; \
})
/*!
* \brief Boiler-plate messaging macro for cleaning up message types.
*
* Note that if your type is defined in core instead of a loadable module, you
* should call message type cleanup from an ast_register_cleanup() handler
* instead of an ast_register_atexit() handler.
*
* The reason is that during an immediate shutdown, loadable modules (which may
* refer to core message types) are not unloaded. While the atexit handlers are
* run, there's a window of time where a module subscription might reference a
* core message type after it's been cleaned up. Which is bad.
*
* \param name Name of message type.
* \since 12
*/
#define STASIS_MESSAGE_TYPE_CLEANUP(name) \
({ \
ao2_cleanup(_priv_ ## name); \
_priv_ ## name = NULL; \
})
/*! @} */
/*! @{ */
/*!
* \brief Initialize the Stasis subsystem.
* \return 0 on success.
* \return Non-zero on error.
* \since 12
*/
int stasis_init(void);
/*! @} */
/*! @{ */
/*!
* \internal
* \brief called by stasis_init() for cache initialization.
* \return 0 on success.
* \return Non-zero on error.
* \since 12
*/
int stasis_cache_init(void);
/*!
* \internal
* \brief called by stasis_init() for config initialization.
* \return 0 on success.
* \return Non-zero on error.
* \since 12
*/
int stasis_config_init(void);
/*! @} */
/*!
* \defgroup StasisTopicsAndMessages Stasis topics, and their messages.
*
* This group contains the topics, messages and corresponding message types
* found within Asterisk.
*/
#endif /* _ASTERISK_STASIS_H */
|