SQL Server Service Broker
服务体系结构消息类型 — 定义应用程序间交换的消息的名称。还可以选择是否验证消息。
约定 — 指定给定会话中的消息方向和消息类型。队列 — 存储消息。此存储机制使服务间可以进行异步通信。Service Broker 队列还有其他优点,比如自动锁定同一个会话组中的消息。服务 — 是可寻址的会话端点。Service Broker 消息从一个服务发送到另一个服务。服务指定一个队列来保存消息,还指定一些约定,约定指明该服务可作为“目标”。约定向服务提供一组定义完善的消息类型。
处理的先决条件.
USE master;
GOALTER DATABASE 目标数据库
SET ENABLE_BROKER;GO
-- 如果上面的操作执行后,长时间无反应,有死机的嫌疑,尝试下面的语句。
ALTER DATABASE 目标数据库 SET NEW_BROKER WITH ROLLBACK IMMEDIATE;GO
ALTER DATABASE 目标数据库 SET ENABLE_BROKER;GO
USE 目标数据库;
GO
-------------------- Hello World 的例子 --------------------
-- 创建 SayHelloMessage 消息类型.
-- 该消息类型,不做数据验证的处理.CREATE MESSAGE TYPE SayHelloMessage VALIDATION = None;GO-- 创建 约定 SayHelloContract
-- 定义了,发送/接收方.-- 都是用这个消息类型.CREATE CONTRACT SayHelloContract ( SayHelloMessage SENT BY ANY);GO -- 创建发送/接收队列CREATE QUEUE SayHelloSendQueue;CREATE QUEUE SayHelloReceiveQueue;GO -- 创建发起方服务 SayHelloSendService-- 该服务使用 SayHelloSendQueue 队列-- 由于未指定约定名称,因而其他服务不可将此服务用作目标服务。CREATE SERVICE SayHelloSendService ON QUEUE SayHelloSendQueue;GO -- 创建目标服务 SayHelloReceiveService-- 该服务使用 SayHelloReceiveQueue 队列-- 使用 SayHelloContract 约定CREATE SERVICE SayHelloReceiveService ON QUEUE SayHelloReceiveQueue ([SayHelloContract]);GO
-- 测试发送.
BEGIN -- 定义发送的句柄. DECLARE @InitDlgHandle UNIQUEIDENTIFIER;-- 定义变量.
DECLARE @MyMessage NVARCHAR(100); -- 设置发送消息的内容. SET @MyMessage = N'Hello World!'-- 开始事务处理.
BEGIN TRANSACTION; -- 定义消息发送处理. BEGIN DIALOG @InitDlgHandle FROM SERVICE -- 定义发送服务. SayHelloSendService TO SERVICE -- 定义接收服务. N'SayHelloReceiveService' ON CONTRACT -- 定义使用的约定 SayHelloContract WITH -- 不加密. ENCRYPTION = OFF; -- 发送消息. SEND ON CONVERSATION @InitDlgHandle MESSAGE TYPE [SayHelloMessage] ( @MyMessage );-- 输出接收到的消息.PRINT '我发送了:' + @MyMessage; -- 提交事务. COMMIT TRANSACTION;ENDGO -- 测试接收,处理,并反馈.BEGIN -- 接收句柄. DECLARE @RecvReqDlgHandle UNIQUEIDENTIFIER; -- 接收到的数据. DECLARE @RecvReqMsg NVARCHAR(100); -- 接收到的数据类型名称. DECLARE @RecvReqMsgName sysname; -- 开始事务处理. BEGIN TRANSACTION; -- 尝试从 SayHelloReceiveQueue 队列 接收消息. WAITFOR ( RECEIVE TOP(1) @RecvReqDlgHandle = conversation_handle, @RecvReqMsg = message_body, @RecvReqMsgName = message_type_name FROM SayHelloReceiveQueue ), TIMEOUT 1000;-- 如果接收到的消息类型名为 SayHelloMessage
-- 那么进行处理. IF @RecvReqMsgName = N'SayHelloMessage' BEGIN -- 定义准备用于返回的消息. DECLARE @ReplyMsg NVARCHAR(100); -- 简单设置. SELECT @ReplyMsg = '~' + @RecvReqMsg + '~';-- 调试输出.PRINT '我接收到:' + @RecvReqMsg + "; 我将反馈:" + @ReplyMsg; -- 发送反馈消息. SEND ON CONVERSATION @RecvReqDlgHandle MESSAGE TYPE [SayHelloMessage] (@ReplyMsg); END CONVERSATION @RecvReqDlgHandle; END; -- 提交事务. COMMIT TRANSACTION;ENDGO
-- 测试获取处理结果.
BEGIN DECLARE @RecvReplyMsg NVARCHAR(100); DECLARE @RecvReplyDlgHandle UNIQUEIDENTIFIER; -- 开始事务处理. BEGIN TRANSACTION; -- 尝试从 SayHelloReceiveQueue 队列 接收消息. WAITFOR ( RECEIVE TOP(1) @RecvReplyDlgHandle = conversation_handle, @RecvReplyMsg = message_body FROM SayHelloSendQueue ), TIMEOUT 1000; END CONVERSATION @RecvReplyDlgHandle;-- 输出接收到的消息.PRINT '我接收到反馈:' + @RecvReplyMsg; -- 提交事务. COMMIT TRANSACTION;ENDGO-------------------- Hello World 内部激活的例子 --------------------
-- 专门用于处理消息的存储过程.
CREATE PROCEDURE SayHelloQueueProcASBEGIN -- 接收句柄. DECLARE @RecvReqDlgHandle UNIQUEIDENTIFIER; -- 接收到的数据. DECLARE @RecvReqMsg NVARCHAR(100); -- 接收到的数据类型名称. DECLARE @RecvReqMsgName sysname; -- 循环处理. WHILE (1=1) BEGIN -- 开始事务处理. BEGIN TRANSACTION; -- 尝试从 SayHelloReceiveQueue 队列 接收消息. WAITFOR ( RECEIVE TOP(1) @RecvReqDlgHandle = conversation_handle, @RecvReqMsg = message_body, @RecvReqMsgName = message_type_name FROM SayHelloReceiveQueue ), TIMEOUT 5000;-- 判断有没有获取到消息.
IF (@@ROWCOUNT = 0) BEGIN -- 如果没有接收到消息 -- 回滚事务. ROLLBACK TRANSACTION; -- 跳出循环. BREAK; END-- 如果接收到的消息类型名为 SayHelloMessage
-- 那么进行处理. IF @RecvReqMsgName = N'SayHelloMessage' BEGIN -- 定义准备用于返回的消息. DECLARE @ReplyMsg NVARCHAR(100); -- 简单设置. SELECT @ReplyMsg = '~' + @RecvReqMsg + '~';-- 调试输出.PRINT '我接收到:' + @RecvReqMsg + "; 我将反馈:" + @ReplyMsg;-- 发送反馈消息.
SEND ON CONVERSATION @RecvReqDlgHandle MESSAGE TYPE [SayHelloMessage] (@ReplyMsg); END CONVERSATION @RecvReqDlgHandle; END; -- 提交事务. COMMIT TRANSACTION; ENDENDGO
-- 更改目标队列以指定内部激活
-- 也就是当有消息发送到 SayHelloReceiveQueue 队列的时候.-- 自动调用 SayHelloQueueProc 存储过程 进行处理.ALTER QUEUE SayHelloReceiveQueue WITH ACTIVATION ( STATUS = ON, PROCEDURE_NAME = SayHelloQueueProc, MAX_QUEUE_READERS = 10, EXECUTE AS SELF );GO
-- 由于消息已经处于自动处理的方式。
-- 测试发送 并 接收.BEGIN -- 定义发送的句柄. DECLARE @InitDlgHandle UNIQUEIDENTIFIER;-- 定义变量.
DECLARE @MyMessage NVARCHAR(100); -- 设置发送消息的内容. SET @MyMessage = N'Hello World!'-- 开始事务处理.
BEGIN TRANSACTION; -- 定义消息发送处理. BEGIN DIALOG @InitDlgHandle FROM SERVICE -- 定义发送服务. SayHelloSendService TO SERVICE -- 定义接收服务. N'SayHelloReceiveService' ON CONTRACT -- 定义使用的约定 SayHelloContract WITH -- 不加密. ENCRYPTION = OFF; -- 发送消息. SEND ON CONVERSATION @InitDlgHandle MESSAGE TYPE [SayHelloMessage] ( @MyMessage );-- 输出接收到的消息.PRINT '我发送了:' + @MyMessage; -- 提交事务. COMMIT TRANSACTION; -- 等待 5 秒. WAITFOR DELAY '00:00:05'; DECLARE @RecvReplyMsg NVARCHAR(100); DECLARE @RecvReplyDlgHandle UNIQUEIDENTIFIER; -- 开始事务处理. BEGIN TRANSACTION; -- 尝试从 SayHelloReceiveQueue 队列 接收消息. WAITFOR ( RECEIVE TOP(1) @RecvReplyDlgHandle = conversation_handle, @RecvReplyMsg = message_body FROM SayHelloSendQueue ), TIMEOUT 1000; END CONVERSATION @RecvReplyDlgHandle;-- 输出接收到的消息.PRINT '我接收到反馈:' + @RecvReplyMsg; -- 提交事务. COMMIT TRANSACTION;END
GO我发送了:Hello World!
(1 行受影响)
我接收到反馈:~Hello World!~