Создание распределенных приложений на основе
сервера очередей Microsoft Message Queue Server
Алексей Шуленин
Введение
Настоящая статья служит продолжением обзора
средств создания распределенных приложений на
базе серверов промежуточного слоя корпорации
Microsoft. В нашей книге [1] мы рассмотрели основные
вопросы построения компонентов бизнес-логики в
виде ActiveX-объектов под управлением Microsoft Transaction
Server (MTS) и обеспечения плотной целостности (tight
consistency) в транзакциях на уровне приложения при
помощи протокола двухфазной фиксации OLE 2PC.
Необходимо иметь в виду, что принцип плотной
целостности предъявляет довольно жесткие
требования к работе приложений и качеству
каналов связи, так как для успешного завершения
распределенной 2PC-транзакции все ее участники
как минимум должны быть доступны. На практике это
обычно бывает возможно лишь в пределах одной
локальной сети. Рассмотрим крупное предприятие,
имеющее множество отделений и филиалов,
взаимодействующих друг с другом. Внутри каждого
отделения (читай: ЛВС) мы можем поставить один или
несколько Microsoft Transaction Server,
которые будут прекрасно управлять уровнем
бизнес-логики, мультиплексируя обращения к
локальному хранилищу. Однако этого недостаточно.
Чтобы организовать асинхронное межсетевое
взаимодействие, нам потребуется некий
инструмент промежуточного хранения и адресной
доставки. Пока приложение-получатель недоступно,
этот инструмент накапливает внутри себя
адресованные получателю сообщения (например,
запросы к серверу баз данных или вызовы
компонентов) и доставляет их ему по оптимальному
маршруту с соблюдением порядка поступления, как
только последний проявит себя как доступный. В
западной литературе подобные средства часто
выступают под аббревиатурой MOM — message-oriented middleware.
В октбре 1997 года Microsoft дебютировала на этом рынке,
выпустив сервер очередей Microsoft Message Queue Server (MSMQ).
Как гласит программистский фольклор, лучшим
средством преобразования стека в очередь
является автомат Калашникова. Другим, не менее
эффективным средством управления очередями,
по-видимому, стоит признать Microsoft Message Queue Server. Он
так же неприхотлив в обращении (легок в
администрировании), компактен, надежен и по своим
функциональным характеристикам отличается
производительностью и мощностью. Дополнительным
преимуществом MSMQ является его относительно
невысокая стоимость. По сути, он
распространяется бесплатно в составе NT. В работе
[2] для сравнения приводятся цены на некоторые
аналогичные продукты других производителей,
например IBM MQSeries или NEONet. Полный обзор этих и
других достоинств MSMQ дается в
работе [3], поэтому мы не будем сейчас подробно на
них останавливаться. В рамках настоящей статьи
нас будут интересовать практические вопросы
построения асинхронных распределенных
приложений на базе сервера очередей. Для
разработки средствами C и С++
MSMQ предлагает прикладные программные интерфейсы
MSMQ API. Он также содержит набор управляющих
объектов ActiveX, практически эквивалентных по
своей функциональности упомянутому API, которые
позволяют использовать MSMQ из средств разработки,
поддерживающих компоненты ActiveX: Microsoft Visual Basic,
Microsoft Visual Java, Borland Delphi и т.д. MSMQ обладает
дополнительными интерфейсами к RPC и MAPI, в
частности, к Microsoft Exchange Server.
Мы постараемся проиллюстрировать некоторые из
перечисленных возможностей на примере вполне
конкретного приложения. Рассмотрим задачу
организации обслуживания удаленных клиентов
типа системы “клиент-банк”. Схема работы
приложения и задействованное в нем программное
обеспечение приводятся на рис.1. Мы имеем два
сервера баз данных Microsoft SQL Server 6.5, моделирующих
два различных банка. Клиент по электронной почте
Microsoft Exchange посылает в первый банк поручение на
перевод определенной суммы со счета первого
банка на счет во втором банке (филиале). Приняв
это поручение, первый банк инициирует
распределенную транзакцию, состоящую из двух
операций — снятия суммы со счета в своей базе и
зачисления на счет во второй (которая находится
под управлением второго сервера). Случай
синхронного взаимодействия между серверами баз
данных был рассмотрен в [1]. Если одна из операций
завершалась неуспешно, то MTS откатывал целиком
всю транзакцию. Добавление в эту схему MSMQ дает
возможность изменить сценарий работы. Если
второй сервер временно недоступен (например, при
обрыве канала связи), первый сервер посылает
предупреждающее сообщение клиенту и, если тот
согласен ждать, транзакция попадает в очередь и
выполняется по восстановлении связи. Рабочие
места операционистов в обоих банках
организованы так, что они видят поступающие
соответственно от клиента или от первого банка
запросы и имеют возможность согласиться на
изменение остатка по счету или отказаться. В
случае любого отказа вся транзакция также
откатывается. Клиент получает сообщение о
результате выполнения своего перевода.
Данное приложение было написано на VB 5.0 c целью
демонстрации работы MTS и MSMQ на Windows NT Enterprise Edition
Launch 24/11/97. Оно носит чисто модельный характер,
никоим образом не претендуя на функциональную
полноту и законченность. Тем не менее некоторые
его фрагменты позволят нам вкратце обрисовать
основные принципы работы с ActiveX-интерфейсами MSMQ и
MSMQ Exchange Connector.
Операции над очередями
Установим на первый сервер MSMQ как Primary Enterprise
Controller (PEC). PEC хранит информацию о конфигурации
предприятия (Enterprise), необходимую для оптимальной
маршрутизации сообщений. PEC автоматически
является Primary Site Controller (PSC) для своего сайта. При
установке MSMQ Server в качестве PEC или PSC создается
база данных MQIS на Microsoft SQL Server
для хранения информации об очередях, компьютерах
и других служебных данных. Microsoft SQL Server может быть
приобретен отдельно, либо используется его
ограниченная версия, входящая в состав поставки
MSMQ Server. Read-only-реплику базы PSC содержит также Backup Site
Controller (BSC) внутри данного сайта. Наличие BSC не
является обязательным условием организации
сайта и диктуется соображениями
производительности и надежности. Как PSC, так и BSC
функционируют также в качестве MSMQ Routing Server.
С точки зрения MSMQ второй сервер будет являться
клиентом первого, для чего установим на нем MSMQ
Independent Client (независимый клиент). В отличие от
простого или сетевого клиента независимый
клиент MSMQ обладает возможностью работать в
сайтах без PSC.
Рассмотрим в качестве примера создание очереди
MSMQ с помощью Visual Basic. Установка MSMQ Server, Client или
Independent Client автоматически регистрирует на
компьютере библиотеку поддержки ActiveX-объектов
Microsoft Message Queue Object Library (mqoa.dll), на которую мы можем
сослаться в нашем проекте. Очередь представляет
собой некий репозитарий сообщений,
местоположение и доступ к которому определяются
через MSMQ Information Service. Очереди делятся на
глобальные (public) и частные (private). Глобальные
очереди содержатся в информационном хранилище
MSMQ, тиражируются в пределах всей области,
находящейся под управлением одного PEC, и,
следовательно, теоретически могут принадлежать
любому MSMQ внутри этой области. Частные очереди
хранятся на том локальном компьютере, где они
были определены, не тиражируются и требуют для
доступа указания полного имени. К общим
свойствам очереди относятся путь (Pathname), метка
(Label), идентификатор (ID) и идентификатор типа (Type ID).
Путь состоит из имени компьютера, информации о
том, является очередь глобальной или частной, и
имени очереди. Например, ntalexejsanyqueue — глобальная
очередь с именем anyqueue, относящаяся к компьютеру
ntalexejs. .PRIVATE$anyqueue1 — частная очередь с именем
anyqueue1, находящаяся на локальной машине. Путь
используется при создании очереди и говорит MSMQ о
том, где хранить сообщения, принадлежащие
очереди, и где ее регистрировать. Метка является
произвольным кратким описанием очереди,
например, MyQueueForResponse; идентификатор — это GUID
(global unique identifier), однозначно
определяющий конкретную очередь как уникальную
сущность. Написанная нами процедура CreateMQPublQue
будет создавать глобальную очередь с меткой
strQueLbl на компьютере strMachineName (по умолчанию на
локальной машине). Последний входной параметр
blnIsQueForExchToMQ информирует MSMQ о типе очереди: обычная
или предназначенная для связи с Microsoft Exchange (по
умолчанию обычная).
Sub CreateMQPublQue (strQueLbl As String, Optional strMachineName As
String = “.”, _
Optional blnIsQueForExchToMQ As Boolean)
Начинаем с того, что создаем
новый объект MSMQQueueInfo и получаем ссылку на него в
переменную qinfo. MSMQQueueInfo является одним из
компонентов ActiveX-интерфейса MSMQ, который играет по
отношению к очереди примерно такую же роль, как
file handle по отношению к файлу.
Dim qinfo As New MSMQQueueInfo
Задаем основные свойства новой
очереди, которые были охарактеризованы выше:
путь, метку и т.д. По умолчанию очередь будет
относиться к локальному компьютеру, так как
strMachineName=”.”. Метка очереди в нашем случае будет
автоматически являться ее именем.
qinfo.PathName = strMachineName & “” & strQueLbl
qinfo.Label = strQueLbl
Идентификатор типа очереди (ServiceTypeGuid) требуется
в тех ситуациях, когда необходимо в явной форме
указать MSMQ, что данная очередь будет содержать
сообщения, требующие особого вида обработки,
например используется провайдером транспорта
MAPI. В нашем случае, если мы хотим, чтобы эта
очередь предназначалась для приема почты от MS
Exchange Server, то идентификатор типа должен быть равен
{5EADC0D0-7182-11CF-A8FF-0020AFB8FB50}. Именно это значение
содержит константа MSMQMAIL_SERVICE_MAIL.
If blnIsQueForExchToMQ Then
qinfo.ServiceTypeGuid = MSMQMAIL_SERVICE_MAIL
End If
On Error GoTo ErrHandler
После того как все необходимые свойства заданы,
нам остается собственно ее создать, что мы и
сделаем вызовом метода Create объекта MSMQQueueInfo.
qinfo.Create
Exit Sub
ErrHandler:
MsgBox “Не могу создать очередь. Ошибка: “ &
Str$(Err.Number) & vbNewLine & “По причине: “ & Err.Description
End Sub
Давайте испытаем ее в действии:
вставим в Sub Main() вызов
CreateMQPublQue “FromExchToMQ”, , True
запустим Message Queue Explorer (если он
запущен, просто нажмем на кнопку Refresh) и
посмотрим, что произошло в информационном
хранилище (рис. 2):
Вызовем нажатием правой кнопки мыши
контекстное меню очереди FromExchToMQ и выберем пункт
Properties, который покажет нам ее основные свойства
(рис. 3).
Те же самые глобальные очереди, которые
показывает Message Queue Explorer, но в гораздо менее
читабельном виде, можно получить из любого
клиента MS SQL Server (разумеется, при наличии
соответствующих прав), послав к базе данных MQIS
запрос select * from Queue.
В рамках первого знакомства с MSMQ мы не ставим
своей целью дать исчерпывающее описание
решительно всех свойств и методов решительно
всех объектов ActiveX-интерфейса MSMQ. Например, при
разработке рассматриваемого демонстрационного
приложения нам не надо было ограничивать размер
очереди (Quota), устанавливать ее базовый приоритет
(BasePriority), журналировать поступающие сообщения
(Journal) и многое другое, но это не значит, что таких
возможностей в MSMQ нет. В связи с этим
рекомендуется иногда заглядывать в
документацию, потому что там можно найти много
полезного, что не понадобилось в данном случае
нам, но может очень пригодиться для решения
серьезных задач.
Итак, ActiveX-объект MSMQQueueInfo является главным
средством для обращения к очередям. При
формировании новой очереди мы создавали его
заново, при выполнении операций над уже
существующей очередью нам нужно научиться
получать на него ссылку, если известно одно или
несколько свойств очереди: идентификатор, метка,
идентификатор типа, время создания и т.д. Для
дальнейших задач нам наиболее часто понадобится
получать MSMQQueueInfo по метке
очереди, поэтому лучше оформить эту
последовательность действий в виде функции
Function FindQueOnLabel(strQueLbl As String) As MSMQQueueInfo
Dim query As New MSMQQuery
MSMQQuery является еще одним
встроенным ActiveX-объектом MSMQ и предназначается
для запрашивания информационного хранилища MQIS о
существующих глобальных очередях по известным
реквизитам. Результатом запроса является
коллекция объектов MSMQQueueInfo, чьи свойства
удовлетворяют условию поиска. Определим
переменную qinfos как коллекцию MSMQQueueInfo:
Dim qinfos As MSMQQueueInfos
и выполним поиск по метке
очереди:
Set qinfos = query.LookupQueue(Label:=strQueLbl)
Вернемся на начало коллекции:
qinfos.Reset
Получим первый из результатов:
Dim qinfo As MSMQQueueInfo
Set qinfo = qinfos.Next
Если по нашему запросу очередей
не найдено, выдаем соответствующее
предупреждение:
If qinfo Is Nothing Then
MsgBox “Очереди с лэйблом “ & strQueLbl & “ не
найдено”
в противном случае возвращаем
указатель на первую из подходящих очередей:
Else
Set FindQueOnLabel = qinfo
End If
End Function
Теперь мы можем очень просто
решить задачу удаления очереди с известной
меткой:
Sub DeleteMQPublQue(strQueLbl As String)
FindQueOnLabel(strQueLbl).Delete
End Sub
В рассматриваемом приложении
каждая глобальная очередь обозначалась своей
меткой, отличной от меток других глобальных
очередей. Возникает вопрос: что делать, если
несколько очередей имеют совпадающие метки
(такое в принципе возможно). Ответ простой — надо
проводить поиск по свойству (например, GUID) или
комбинации свойств, уникально определяющих
очередь. Набор параметров метода LookupQueue объекта
MSMQQuery позволяет это сделать. Кстати, из текста
примера с функцией FindQueOnLabel очевидно вытекает,
как обработать не одну, а целую группу очередей,
обладающих каким-то общим
признаком. Для этого нужно слегка изменить
вышеприведенный фрагмент кода, чтобы операции
выполнялись не над первой очередью из
результатов запроса, а над всей коллекцией.
Предположим, по какой-то причине нам настолько
неприятны очереди c Type ID=
{BBD97DE0-CB4F-11CF-8E62-00AA006B4F2F}, что мы хотим их удалять
всякий раз перед началом работы программы. Нет
ничего проще — давайте вставим в обработку
события Form_OnLoad (или UserDocument_Initialize и т.п.) следующий
код:
Dim query As New MSMQQuery
Dim qinfos As MSMQQueueInfos
Set qinfos =
query.LookupQueue(ServiceTypeGuid:=”{BBD97DE0-CB4F-11CF-8E62-0AA006B4F2F}”)
qinfos.Reset
Dim qinfo As MSMQQueueInfo
Set qinfo = qinfos.Next
While Not qinfo Is Nothing
qinfo.Delete
Set qinfo = qinfos.Next
Wend
Удобно, хотя и не всегда верно, представлять
себе отображение изменений состояния очередей
как реакцию на административные сообщения,
которыми обмениваются серверы и клиенты MSMQ, или
как распространение транзакций в распределенной
базе данных MQIS. Поясним сказанное на примере.
Пусть имеется MSMQ Server в качестве PSC и связанный с
ним MSMQ Client. Даже если в данный момент связь между
ними отсутствует, мы всегда можем создать или
удалить очередь на клиенте непосредственно с
сервера, но эти изменения станут заметны с
клиента, только когда связь восстановится и он
получит доступ к своему PSC, а точнее к его базе
данных. Убедиться, что тот или иной компьютер
доступен, можно, например, выполнив MQPing из MSMQ
Explorer.
Операции над сообщениями
Сообщение представляет собой достаточно общее
понятие, поэтому, как и всякую философскую
категорию, его лучше определить через
совокупность присущих ему проявлений.
Пользовательское сообщение помимо своего
идентификатора характеризуется набором свойств,
описывающих адресата (очередь назначения),
отправителя (машина, пользователь), содержание
(метка, тело письма, специфичные для приложения
данные). Сообщение также содержит служебную
информацию, позволяющую определить особенности
доставки (гарантированная или экспресс,
приоритет, алгоритм шифрования), обработки
(административная очередь и очередь ответа),
данные постфактум (время доставки и отправления)
и другие свойства. Административные очереди
используются для получения уведомлений о
доставке пользовательского сообщения в очередь
назначения, его последующего прочтения
принимающим приложением и т.д. Время доставки и
приема можно ограничить свойствами MaxTimeToReachQueue и
MaxTimeToReceive. Если оно истечет раньше, чем сообщение
будет принято или прочитано, в административную
очередь будет послано отрицательное
уведомление. Отправкой уведомлений занимается
MSMQ. Очереди ответа используются для того, чтобы
указать приложению-приемнику, куда именно оно
должно направить ответ на принятое сообщение.
Наличие очереди ответа в принятом сообщении не
обязывает приложение-приемник слать ответ
непременно в эту очередь. Оно может вообще не
отвечать — это дело его совести. Тем не менее,
если мы не просто извещаем принимающее
приложение, а хотим от него что-то взамен, логично
указать ему, где именно мы будем ждать ответа,
потому что само оно может не догадаться. В MSMQ не
существует заранее предопределенных
административных очередей и очередей ответа. Это
обычные очереди, которые создаются примерно так,
как было показано в предыдущем пункте, о чем
должно позаботиться само приложение. В целях
дополнительной безопасности помимо встроенных
возможностей MSMQ по шифрованию сообщения можно
использовать сертификаты для аутентификации
отправителей. MSMQ поддерживает как внешние
(полученные от организации, занимающейся выдачей
сертификатов: AT&T, InternetMCI Mail, VeriSign и др.), так и
внутренние (то есть выданные MSMQ) сертификаты.
Более того, в свойствах очереди (закладка Advanced
или qinfo.Authenticate) можно явно указать, что все
сообщения, уровень аутентификации которых не
соответствует уровню аутентификации очереди,
должны быть хладнокровно отвергнуты.
Рассмотрим некоторые из перечисленных свойств
на примере простой пользовательской функции,
которая находит первую из очередей с меткой strQueLbl
(выше мы договорились, что метки очередей в нашей
задаче являются уникальными в масштабах всего
предприятия независимо от того, на каком
компьютере они созданы) и посылает в нее
сообщение с меткой strMsgLbl и телом varBodyText. Очередь
ответа является необязательным параметром и по
умолчанию отсутствует. Другим необязательным
параметром служит специфичная для приложения
информация, которой может быть любая переменная
типа Long.
Sub SendMQMsg(strQueLbl As String, strMsgLbl As String, _
varBodyText As Variant, Optional strRespQueLbl As String = “”, Optional lngAppSpec As
Long)
Dim q As MSMQQueue
Используем определенную нами
выше функцию FindQueOnLabel для получения MSMQQueueInfo
очереди по ее метке и открываем очередь для
отправки (MQ_SEND_ACCESS). Второй параметр метода Open
служит для разделения доступа к одной очереди
между процессами и при отправке может быть
только MQ_DENY_NONE
Set q = FindQueOnLabel(strQueLbl).Open(MQ_SEND_ACCESS, MQ_DENY_NONE)
Создаем новый объект “сообщение”. MSMQMessage
входит в ActiveX-интерфейс сервера сообщений.
Dim msg As New MSMQMessage
Задаем свойства сообщения. Обычно, когда в одну
очередь поступает много сообщений,
предусматривающих различный характер
прикладной обработки, программист может
использовать Label и AppSpecific в качестве
отличительных признаков типа сообщения. Поясним
наглядно. Пусть некая процедура занимается
обработкой выписки по какому-то заранее
известному счету. Отводить по очереди на каждого
клиента в общем-то неэкономно (хотя в принципе
ограничений нет), поэтому выписки по всем счетам
падают в общую очередь приема. В этом случае
разумно вынести номер счета в метку сообщения,
чтобы наша процедура не отвлекалась на чтение
тела сообщения, если этот счет ей заведомо
неинтересен.
msg.Label = strMsgLbl
msg.AppSpecific = lngAppSpec
msg.Body = varBodyText
Если требуется ответ, указываем
MSMQQueueInfo очереди, в которой мы были бы счастливы
его видеть:
If strRespQueLbl <> “” Then
Set msg.ResponseQueueInfo = FindQueOnLabel(strRespQueLbl)
End If
Сообщение msg сформировано.
Можно смело отослать его в ранее открытую для
отправки очередь q:
msg.Send q
По умолчанию все сообщения отсылаются (и
принимаются) в контексте текущей транзакции
Microsoft Transaction Server. Если соблюдение транзакционной
целостности не требуется для вашего приложения,
вы можете сэкономить время и ресурсы, отключив
участие в транзакции параметром MQ_NO_TRANSACTION. Если
текущая транзакция началась не на MTS, а на
каком-то внешнем XA-совместимом координаторе
транзакций, вы можете указать
это MSMQ c помощью объекта MSMQTransaction.
q.Close
End Sub
Проверим нашу процедуру в
действии. Создадим на втором сервере
(независимом клиенте) очередь с именем InteractionQueue:
CreateMQPublQue “InteractionQueue”, “ntalexejs”
и пошлем в нее простенькое
сообщение
SendMQMsg “InteractionQueue”, “Проба”, “Hello,
World”
Зайдем в MSMQ Explorer и посмотрим
содержание очереди InteractionQueue (если она еще не
видна, нажмите кнопку Refresh). Мы увидим, что в нее
пришло новое сообщение с меткой “Проба”. С
помощью правой кнопки мыши вызовем меню “Properties”
для этого сообщения и выберем, например, закладку
“Body”. Как легко убедиться, тело сообщения
содержит тот самый текст (Hello, World), который был
нами отправлен (рис. 4). Если у нас нет
соответствующих прав по отношению к очереди или
тому компьютеру, где она находится, ее содержание
будет для нас недоступно и вместо списка
пришедших сообщений мы получим строку “Cannot open
queue…”
Обратите внимание на то, что msg.Body имеет тип Variant.
Это означает, что в качестве тела сообщения можно
передавать не только текстовую строку, но и любой
другой тип, который может быть упакован в простой
Variant: число, валюта, дата, байтовый массив. В общем
случае им является любой ActiveX-объект,
характеризующийся перманентным состоянием, то
есть поддерживающий интерфейсы IPersist* (IPersistStream
или IPersistStorage), например документ MS Office. Если
принимающее сообщение ничего заранее не знает о
теле поступившего сообщения, оно может
использовать функцию TypeName
для проверки простых типов. В Visual Basic нет функции
QueryInterface для проверки поддерживаемых объектом
интерфейсов, поэтому рекомендуется использовать
конструкцию if TypeOf … is … then. Рассмотрим в качестве
примера передачу в сообщении результатов
запроса к базе данных в виде Recordset. В нашем
простеньком приложении “клиент-банк” состояние
банка описывается базой данных Bank, по своей
структуре очень похожей на ту, которая была нами
построена в [1]. В частности, в нее входит таблица
Oper, в которой ведется учет совершенных операций.
Она имеет поля account_deb (счет по дебету), account_cre (счет
по кредиту), сумма сделки, дата, номер операции и
т.д. Тогда выписку по счету, скажем, 10001 легко
получить запросом select * from Oper where account_deb=’10001’ or account_cre=’10001’. Воспользуемся
Remote Data Service (RDS), в недавнем прошлом — Advanced Data
Connector (ADC). RDS кэширует результаты запроса на
клиенте и обеспечивает столь необходимую нам
перманентность состояния. Отметим в Project->References использование в проекте Microsoft Remote
Data Services 1.5 Library (msadco.dll). Тогда средствами RDS имеем:
Dim oADS As New RDS.DataSpace
Dim oADF As Object
Cоздаем встроенный
бизнес-объект доступа к данным на локальном
сервере:
Set oADF = oADS.CreateObject(“RDSServer.DataFactory”, “”)
Dim oRDS_DC As New RDS.DataControl
Обойдемся соединением без DSN и с
ходу выполним запрос:
oRDS_DC.SourceRecordset = oADF.query(“driver={SQL
Server};server=;uid=sa;pwd=;database=bank”,
“select * from Oper where account_deb=’10001’ or account_cre=’10001’ “)
SendMQMsg “InteractionQueue”, “10001”, oRDS_DC.Recordset
Нам осталось принять сообщение
и убедиться, что все дошло нормально.
Для приема сообщений объект MSMQQueue располагает
методами Peek, PeekCurrent, PeekNext и Receive, ReceiveCurrent. Отличие
между Peek* и Receive* состоит в том, что первая группа
методов просто читает пришедшие сообщения без их
удаления из очереди, а методы второй группы
вдобавок удаляют только что прочитанное
сообщение. Методы Peek и Receive читают
первое сообщение в очереди, PeekCurrent и ReceiveCurrent —
текущее, на которое указывает внутренний курсор
очереди. PeekNext передвигает курсор на следующее
сообщение в очереди и прочитывает его оттуда.
Если сообщения имеют одинаковый приоритет
доставки, то под первым, или верхним, здесь
понимается пришедшее последним. Напишем функцию,
читающую первое сообщение из первой попавшейся
очереди с меткой strQueLbl.
Function SyncReceiveMQMsg(strQueLbl As String, lngTimeout As Long, _
blnDeleteMsgFromQue As Boolean, blnWantBody As Boolean) As MSMQMessage
Параметр lngTimeout определяет
время (в секундах), которое необходимо ждать, если
очередь пуста. 0 означает не ждать нисколько, –1 —
ждать, пока хоть что-то не придет в эту очередь. В
зависимости от значения параметра blnDeleteMsgFromQue мы
будем читать с удалением (Receive) или, прочитав,
оставлять в очереди (Peek). Последний параметр
позволяет прочитать сообщение, не читая тела,
сэкономив на этом время в случае
последовательного поиска в очереди
интересующего нас сообщения, например, по метке,
особенно если содержания сообщений достаточно
объемны.
Dim q As MSMQQueue
Set q = FindQueOnLabel(strQueLbl).Open(MQ_RECEIVE_ACCESS, MQ_DENY_NONE)
Второй параметр метода Open
позволяет ограничить доступ к открываемой на
прием очереди. Если он равен MQ_DENY_RECEIVE_SHARE, то при
попытке открыть очередь, уже открытую для чтения
другим процессом, генерируется ошибка
MQ_ERROR_SHARING_VIOLATION.
If blnDeleteMsgFromQue Then
Set SyncReceiveMQMsg = q.Receive(WantBody:=blnWantBody, ReceiveTimeout:=lngTimeout)
Else
Set SyncReceiveMQMsg = q.Peek(WantBody:=blnWantBody, ReceiveTimeout:=lngTimeout)
End If
q.Close
End Function
Используем эту функцию для
приема ранее отправленного объекта Recordset:
Dim msg As MSMQMessage
Set msg = SyncReceiveMQMsg(“InteractionQueue”, 0, True, True)
Просмотрев отладчиком свойства
msg.Body, можно убедиться, что они тождественны
RDS_DC.Recordset. Конечно, чтобы передать набор записей,
мы могли бы использовать массивы, строки, разбить
передачу на несколько сообщений и т.д., но
передача объекта выглядит более изящно, так как
избавляет нас от необходимости дополнительных
преобразований при отправке и приеме. Тело
принятого сообщения представляет собой готовый
объект, который в дальнейшем мы можем
использовать непосредственно. Например,
присвоить аналогичному свойству объекта RDS:
Dim oRDS_DC1 As New RDS.DataControl
oRDS_DC1.SourceRecordset = msg.Body
или ActiveX Data Objects
(ADO) 1.5:
Dim oADORS As ADODB.Recordset
Set oADORS = msg.Body
или населить им какой-нибудь визуальный
элемент управления, например Sheridan Databound Control:
<OBJECT ID=”GRID” WIDTH=500 HEIGHT=300
Datasrc=”#oRDS_DC1"
CODEBASE=”http://<%=Request.ServerVariables(“SERVER_NAME”)%>/MSADC/Samples/ssdatb32.cab”
CLASSID=”CLSID:AC05DC80-7DF1-11d0-839E-00A024A94B3A”>
В приведенном выше примере
прием сообщения блокирует дальнейшее выполнение
до тех пор, пока сообщение не будет прочитано или
пока не истечет время ожидания его получения
(ReceiveTimeout). Наряду с этим ActiveX-интерфейс MSMQ
предоставляет возможность асинхронного приема с
помощью объекта MSMQEvent, который генерирует
событие Arrived в момент поступления сообщения в
определенную очередь. Предположим, сценарий
работы предусматривает при получении
несертифицированного или еще по какой-либо
причине подозрительного сообщения от клиента
оповещение администратора и перезапуск
компьютера, с которого это сообщение было
отправлено. Как известно, в Win32 API существует
функция InitiateSystemShutdown:
Private Declare Function InitiateSystemShutdown Lib “advapi32”
Alias “InitiateSystemShutdownA”
(ByVal lpMachineName As String, ByVal lpMessage As String, _
ByVal dwTimeoutInSecsToDisplayMsg As Long, _
ByVal bForceAppsClosed As Long, ByVal bRebootAfterShutdown As Long) As Long,
при наличии соответствующих
прав позволяющая делать Logoff/Restart/Shutdown удаленной
машины, но мы поступим иначе. В целях
демонстрации асинхронного приема мы пошлем
клиенту особое сообщение, приняв которое, тот
перезапустится. В этом случае нам достаточно
функции локального выхода из системы ExitWindowsEx.
Ниже приводится приблизительный клиентский код,
который вызывает перезапуск при получении
сообщения с меткой “Shutdown”.
Ссылаемся на внешнюю функцию и объявляем
требующиеся ей константы
Private Declare Function Lib “user32” (ByVal dwOptions As Long,
ByVal dwReserved As Long) As Long
Private Const EWX_Logoff As Long = 0
Private Const EWX_Shutdown As Long = 1
Private Const EWX_Reboot As Long = 2
Private Const EWX_Force As Long = 4
Private Const EWX_PowerOff As Long = 8
Определяем очередь приема и переменную qevent
класса MSMQEvent, инициирующего событие приема.
Dim qRcv As MSMQQueue
Dim WithEvents qevent As MSMQEvent
Сразу по загрузке формы
Private Sub Form_Load()
мы инстанциируем qevent, так как New
не может использоваться одновременно с WithEvents:
Set qevent = New MSMQEvent
открываем очередь с меткой InteractionQueue на прием:
Set qRcv =
FindQueOnLabel(“InteractionQueue”).Open(MQ_RECEIVE_ACCESS, MQ_DENY_NONE)
и включаем обработку события приема для данной
очереди.
qRcv.EnableNotification event:=qevent, Cursor:=MQMSG_CURRENT,
ReceiveTimeout:=-1
В качестве параметров, кроме
ссылки на объект MSMQEvent, передаются уже известные
нам данные: положение курсора в очереди
сообщений и время ожидания получения. Если по
этой позиции курсора в очереди уже есть
сообщение или оно приходит туда в течение
времени ReceiveTimeout, включается событие Arrived. Если
нет — то нет. В любом случае, чтобы повторить,
приходится снова вызывать EnableNotification. В нашем
случае, если бы мы хотели сделать непрерывную
асинхронную обработку поступающих сообщений, мы
должны были бы еще раз написать q.EnableNotification qevent,
MQMSG_CURRENT, —1 в конце процедуры обработки события
qevent_Arrived. EnableNotification не рекомендуется включать в
начале qevent_Arrived, потому что в очередь может
поступить новое сообщение, вызвав новое событие
Arrived до того, как вы успеете обработать
предыдущее. Последствия в этом случае
непредсказуемы, но гарантированно неприятны.
End Sub
Наш обработчик события прихода будет крайне
незамысловат. Он мгновенно (ReceiveTimeout:=0) читает
поступившее сообщение — зачем ждать, если мы
знаем, что оно уже есть, — и проверяет его метку.
Если метка совпадает со словом “Shutdown”,
производится выгрузка текущего пользователя
(EWX_Logoff) с принудительным завершением работы
приложений (EWX_Force), то есть без выдачи
соответствующего предупреждения будут
завершены даже приложения, имеющие
несохраненные данные. Ради
простоты здесь не приводится проверка
отправителя cообщения и коррекция привилегий
процесса. Предполагается, что процесс обладает
SeShutdownPrivilege.
Private Sub qevent_Arrived(ByVal q As Object, ByVal Cursor As Long)
В параметре q передается ссылка
на очередь, куда пришло сообщение. Это, в
частности, полезно, если для нескольких очередей
используется один обработчик.
Dim msg As MSMQMessage
Set msg = q.ReceiveCurrent(ReceiveTimeout:=0)
If msg.Label = “Shutdown” Then
ExitWindowsEx (EWX_Logoff or EWX_Force), &HFFFF
End If
End Sub
Если теперь запустить это
приложение и послать откуда-либо в очередь
InteractionQueue сообщение с меткой Shutdown
SendMQMsg “InteractionQueue”, “Shutdown”, “Это
конец”,
произойдет выгрузка текущего
пользователя на компьютере, где было запущено
данное приложение.
Как уже упоминалось, одна и та же процедура
обработки события может применяться к разным
очередям (q1.EnableNotification qevent,…; q2.EnableNotification qevent,…).
Естественно, можно объявить несколько различных
переменных класса MSMQEvent и назначить разным
очередям разные обработчики. Поскольку от
момента генерации события прихода сообщения до
того, как вы соберетесь с ним что-то сделать, его,
благодаря другим клиентам, может уже не
оказаться, рекомендуется убедиться, что
пришедшее сообщение все еще там, особенно если
при открытии очереди используется опция MQ_DENY_NONE.
Литература
Шуленин А. Microsoft Transaction Server // Решения Microsoft
№ 2,3, 1997 г. (КомпьютерПресс №11, 1997 г.)
David S. Linthicum, Get the Message // DBMS, Nov. 1997 (Vol.10, Number 12)
Артемов Д. Microsoft Message Queue Server // Решения Microsoft №3, 1997
г. (КомпьютерПресс №11, 1997 г.)
Шуленин А. Microsoft SQL Server 6.5 Enterprise Edition // Решения
Microsoft № 4, 1997 г. (КомпьютерПресс №12, 1997 г.)
|