Интеграция приложений на основе WebSphere MQ

Примеры работы механизмов публикация-подписка


Теперь после знакомства с технологией публикация-подписка следует рассмотреть работу модели Publish/Subscribe на простых примерах. Для этого понадобиться инсталлировать SupportPacs MA0C: WebSphere MQ (WebSphere MQ) Publish/Subscribe с сайта ИБМ:http://www-306.ibm.com/software/integration/support/supportpacs/category.html

После этого можно стартовать брокер на менеджере очередей командой:

strmqbrk -m QMgrName

Для отображения состояния брокера можно использовать команду dspmqbrk:

dspmqbrk -m QMgrName

В ответ появиться следующее сообщение:

WebSphere MQ message broker for queue manager QMgrName running

Теперь брокер готов получать команды от издателей и подписчиков.

На каждом менеджере может быть стартован только один брокер.

В менеджере есть необходимые системные очереди, которые можно увидеть командой

runmqsc QMgrName display qlocal(SYSTEM.BROKER.*) end

Следует сразу отметить, что завершение работы брокера осуществляется командой endmqbrk перед окончанием работы менеджера: endmqbrk -m QMgrName.

Работу издателя можно продемонстрировать с помощью программы amqsgama, предложенной в SupportPacs MA0C в качестве теста. Эта программа издателя из перечня спортивных тем для подписки (табл.10.1) помещает на брокер сообщения о футбольных матчах (Sport/Soccer/Event/ - в таблице данная тема выделена курсивом) и проверяет ответы брокера.

Таблица 10.1. Возможные спортивные темы для подписки

спорт/футбол/* спорт/теннис/* спорт/баскетбол/*

спорт/футбол/расписание игр спорт/футбол/события спорт/футбол/обзоры



Формат запуска программы:

amqsgama TeamName1 TeamName2 QMgrName

Результаты работы программы, моделирующей случайным образом забивание голов той или иной командой, выглядит следующим образом (рис. 10.3):


Рис. 10.3.  Результаты работы программы издателя

Для работы программы необходимо создание очереди: SAMPLE.BROKER.RESULTS.STREAM.

Именно в эту очередь поступают сообщения от издателя. Необходимо также, чтобы был запущен брокер. Программа подписчика должна стартовать раньше, чем программа издателя amqsgama, чтобы отобразить результаты игры полностью. Все используемые функции в программе служат для подключения к менеджеру брокера и публикации событий о начале матча, окончании матча и забивание гола.


Блочная структура программы выглядит следующим образом.
Подключение к менеджеру брокера (MQCONN) Открытие очереди потока брокера (MQOPEN) Инициализация таймера матча Генерация MQRFH для публикации события о начале матча Добавление имен команд в данные Помещение публикации в очередь потоков Начало цикла по времени матча: засыпание на случайный период попытка забить гол (50% вероятность) генерация публикации о забитом голе (RFH для ScoreUpdate) случайный выбор команды, забившей гол добавление имени команды в данные для публикации помещение публикации в очередь потоков Окончание цикла по времени матча Генерация MQRFH для публикации события о конце матча Добавление имен команд для публикации Помещение публикации в очередь потоков Закрытие очереди потока брокера (MQCLOSE) Отключение от менеджера брокера (MQDISC)
Программа amqsgama имеет следующий код:
Листинг 10.1. Программа amqsgama (html, txt)
В качестве комментария следует отметить, что функция BuildMQRFHeader формирует значения по умолчанию для заголовка MQRFH, устанавливает параметры format и CCSID пользовательских данных. В строку NameValueString добавляются команды, тема и опции для публикации и она выравнивается на 16-ти байтовую границу. StrucLength в MQRFH устанавливается как общая длина. Входными параметрами функции являются pStart – начало блока сообщения, TopicType[] – строка с именем темы. Входным и выходным параметром одновременно является pDataLength – размер блока сообщения при входе и размер выходного блока информации.
Функция PutPublication формирует сообщение для вывода в очередь брокера с помощью команды MQPUT. Входными параметрами функции являются hConn – идентификатор менеджера для команды MQHCONN, hObj – идентификатор очереди, pMessage – идентификатор на начало блока сообщения, messageLength – длина данных в сообщении. Выходными параметрами функции являются pCompCode и pReason – коды завершения команды MQPUT.
Работу подписчика можно продемонстрировать с помощью программы amqsresa из состава SupportPacs MA0C, которая подписывается у брокера на заданную тему (футбол) и получает сообщения от брокера. Формат запуска программы:


amqsresa QMgrName
где QmgrName – имя менеджера очередей, на котором запущен брокер.
Необходимая тема подписки (TOPIC = "Sport/Soccer/*") и очереди для подписчика заданы в теле программы (эти параметры рекомендуется выносить в командную строку или файл инициализации для создания универсальных программ – примеч. автора).
Для работы программы необходимо создание очередей:
runmqsc QMgrName define qlocal(SAMPLE.BROKER.RESULTS.STREAM) define qlocal(RESULTS.SERVICE.SAMPLE.QUEUE) define qlocal(SYSTEM.BROKER.CONTROL.QUEUE) end
Результаты работы программы amqsresa, получающей сообщения от брокера, выглядят следующим образом (рис. 10.4):

Рис. 10.4.  Результаты работы программы подписчика
Объем программы подписчика amqsresa более 2000 строк (в том числе комментариев – 40%) и это не позволяет привести ее в данной лекции. Стоит ограничиться лишь кратким алгоритмом.
Подключение к менеджеру брокера (MQCONN ) Открытие очереди потока брокера и подписчика (MQOPEN ) Генерация MQRFH и подписка на все события Ожидание появления сообщений в очереди подписчика (до 3 минут) Извлечение из очереди MQGET всех публикаций Отбор публикации по теме "Sport/Soccer/*” Обработка и отображение результатов публикации в зависимости от событий (начало матча, конец матча, изменение счета) Выход из цикла по концу матча или по таймеру Закрытие очереди потока брокера и подписчика (MQCLOSE) Отключение от менеджера брокера (MQDISC)
В комментарии к алгоритму следует отметить, что подписка на все события (в алгоритме выделено курсивом) вряд ли объяснима, скорее это сделано в учебных целях. подписчику нет необходимости получать все публикации, поэтому в команде регистрации подписчика (RegSub) целесообразно осуществлять подписку сразу на заданную тему.
В заключение лекции можно привести график времени доставки публикации (мсек) в зависимости от количества подписчиков (рис.10.5), полученный на компьютере RISC/6000, 200MHz, 1 GB RAM с операционной системой AIX 4.3.0 [20]. Этот график показывает, что механизм Publish/Subscribe обеспечивает более высокую производительность, чем приложения, созданные на основе классического подхода с помощью MQI интерфейса и Distribution List.

Рис. 10.5.  Зависимость времени доставки публикации от количества подписчиков


Блочная структура программы выглядит следующим образом.
Подключение к менеджеру брокера (MQCONN) Открытие очереди потока брокера (MQOPEN) Инициализация таймера матча Генерация MQRFH для публикации события о начале матча Добавление имен команд в данные Помещение публикации в очередь потоков Начало цикла по времени матча: засыпание на случайный период попытка забить гол (50% вероятность) генерация публикации о забитом голе (RFH для ScoreUpdate) случайный выбор команды, забившей гол добавление имени команды в данные для публикации помещение публикации в очередь потоков Окончание цикла по времени матча Генерация MQRFH для публикации события о конце матча Добавление имен команд для публикации Помещение публикации в очередь потоков Закрытие очереди потока брокера (MQCLOSE) Отключение от менеджера брокера (MQDISC)
Программа amqsgama имеет следующий код:
/*************************************************************************************/ /* Имя программы: AMQSGAMA */ /* Описание: Основанная на модели Publish/Subscribe программа */ /* моделирует результаты футбольного матча и */ /* отправляет их от издателя к брокеру */ /* Statement: Licensed Materials - Property of IBM */ /* SupportPac MA0E */ /* (C) Copyright IBM Corp. 1999 */ /*************************************************************************************/ #include <stdlib.h> #include <stdio.h> #include <string.h> #include <time.h> #include <cmqc.h> /* MQI */ #include <cmqpsc.h> /* MQI Publish/Subscribe */ #include <windows.h> #if MQAT_DEFAULT == MQAT_WINDOWS_NT #define msSleep(time) \ Sleep(time) #elif MQAT_DEFAULT == MQAT_UNIX #define msSleep(time) \ { \ struct timeval tval; \ tval.tv_sec = time / 1000; \ tval.tv_usec = (time % 1000) * 1000; \ select(0, NULL, NULL, NULL, &tval); \ } #endif #define STREAM "SAMPLE.BROKER.RESULTS.STREAM" #define TOPIC_PREFIX "Sport/Soccer/Event/" #define MATCH_STARTED "MatchStarted" #define MATCH_ENDED "MatchEnded" #define SCORE_UPDATE "ScoreUpdate" #define MATCH_LENGTH 30000 /* 30 Second match length */ #define REAL_TIME_RATIO 333 #define AVERAGE_NUM_OF_GOALS 5 #define DEFAULT_MESSAGE_SIZE 512 /* Maximum buffer size for a message */


static const MQRFH DefaultMQRFH = {MQRFH_DEFAULT}; typedef struct { MQCHAR32 Team1; MQCHAR32 Team2; } Match_Teams, *pMatch_Teams; void BuildMQRFHeader( PMQBYTE pStart , PMQLONG pDataLength , MQCHAR TopicType[] ); void PutPublication( MQHCONN hConn , MQHOBJ hObj , PMQBYTE pMessage , MQLONG messageLength , PMQLONG pCompCode , PMQLONG pReason );
int main(int argc, char **argv) { MQHCONN hConn = MQHC_UNUSABLE_HCONN; MQHOBJ hObj = MQHO_UNUSABLE_HOBJ; MQLONG CompCode; MQLONG Reason; MQOD od = { MQOD_DEFAULT }; MQLONG Options; PMQBYTE pMessageBlock = NULL; MQLONG messageLength; MQLONG timeRemaining; MQLONG delay; PMQCHAR pScoringTeam; pMatch_Teams pTeams; MQCHAR32 team1; MQCHAR32 team2; char QMName[MQ_Q_MGR_NAME_LENGTH+1] = ""; MQLONG randomNumber; MQLONG ConnReason;
/* Проверка аргументов программы */ if( (argc < 3)||(argc > 4)||(strlen(argv[1]) > 31)||(strlen(argv[2]) > 31) ) { printf("Usage: amqsgam team1 team2 <QManager>\n"); printf(" Maximum 31 characters per team name,\n"); printf(" no spaces or '\"' characters allowed.\n"); exit(0); } else { strcpy(team1, argv[1]); strcpy(team2, argv[2]); } /* Использовать default queue manager или заданный в зависимости от наличия аргумена */ if (argc > 3) strcpy(QMName, argv[3]); MQCONN( QMName, &hConn, &CompCode, &ConnReason ); if( CompCode == MQCC_FAILED ) { printf("MQCONN failed with CompCode %d and Reason %d\n", CompCode, ConnReason); } else if( ConnReason == MQRC_ALREADY_CONNECTED ) { CompCode = MQCC_OK; } if( CompCode == MQCC_OK ) { strncpy(od.ObjectName, STREAM, (size_t)MQ_Q_NAME_LENGTH); Options = MQOO_OUTPUT + MQOO_FAIL_IF_QUIESCING; MQOPEN( hConn, &od, Options, &hObj, &CompCode, &Reason ); if( CompCode != MQCC_OK ) { printf("MQOPEN failed to open \"%s\"\nwith CompCode %d and Reason %d\n", od.ObjectName, CompCode, Reason); } }
if( CompCode == MQCC_OK ) { srand( (unsigned)(time( NULL )) + (unsigned)(team1[0] + team2[(strlen(team2) - 1)]) ); timeRemaining = MATCH_LENGTH; messageLength = DEFAULT_MESSAGE_SIZE; pMessageBlock = (PMQBYTE)malloc(messageLength); if( pMessageBlock == NULL ) { printf("Unable to allocate storage\n"); } else { if( CompCode == MQCC_OK ) {

Содержание раздела