Sмарт интеграции IoT с Akenza и Python – CodesCode
В этой статье блога я исследую комбинацию Akenza и Python для открытия новых возможностей интеллектуальной, в реальном времени интеграции и мониторинга в IoT.
Платформа Интернета вещей Akenza, по своим способностям, превосходит в сборе и управлении данными с бесчисленного количества устройств Интернета вещей. Однако, это интеграция с другими системами, такими как системы управления предприятием (ERP), платформы управления взаимоотношениями с клиентами (CRM), системы управления рабочим процессом или инструменты мониторинга окружающей среды, позволяют осуществить полное представление всей организационной среды.
Дополняя возможности Akenza, и обеспечивая плавное взаимодействие, гибкость языка программирования Python. Учитывая, насколько гибким является Python, этот язык является естественным выбором при поиске связи между Akenza и уникальными потребностями организации, стремящейся соединить свою интеллектуальную инфраструктуру.
Эта статья рассказывает о сочетании двух компонентов: Akenza и Python. В конце мы получим:
- Двустороннее подключение к Akenza с использованием Python и WebSockets.
- Python-сервис, подписанный и получающий события от устройств Интернета вещей через Akenza.
- Python-сервис, отправляющий данные на устройства Интернета вещей через Akenza.
Так как подключение WebSocket является постоянным, его использование повышает отзывчивость приложений Интернета вещей, что, в свою очередь, помогает в реальном времени обмениваться данными, способствуя созданию динамичной и гибкой интегрированной экосистемы.
Соединение Python и Akenza по WebSocket
Сначала давайте посмотрим на полный код на Python, который будет обсуждаться позже.
# -*- coding: utf-8 -*-# Zatofrom zato.server.service import WSXAdapter# ################################################################################################ ###############################################################################################if 0: from zato.server.generic.api.outconn.wsx.common import OnClosed, \ OnConnected, OnMessageReceived# ################################################################################################ ###############################################################################################class DemoAkenza(WSXAdapter): # Наше имя name = 'demo.akenza' def on_connected(self, ctx:'OnConnected') -> 'None': self.logger.info('Akenza OnConnected -> %s', ctx)# ############################################################################################### def on_message_received(self, ctx:'OnMessageReceived') -> 'None': # Подтверждаем полученные данные self.logger.info('Akenza OnMessageReceived -> %s', ctx.data) # Это указывает, что мы подключены .. if ctx.data['type'] == 'connected': # .. в тестовых целях используйте фиксированный идентификатор актива .. asset_id = 'abc123' # .. создаем сообщение о подписке .. data = {'type': 'subscribe', 'subscriptions': [{'assetId': asset_id, 'topic': '*'}]} ctx.conn.send(data) else: # .. если мы здесь, это означает, что мы получили сообщение, отличное от типа "connected". self.logger.info('Akenza message (other than "connected") -> %s', ctx.data)# ############################################################################################## def on_closed(self, ctx:'OnClosed') -> 'None': self.logger.info('Akenza OnClosed -> %s', ctx)# ############################################################################################### ##############################################################################################
Теперь разверните код в Zato и создайте новое исходящее подключение WebSocket. Замените ключ API на свой собственный и убедитесь, что формат данных установлен на JSON.
Получение сообщений через WebSockets
WebSocket-сервисы на Python, которые вы создаете, имеют три метода, каждый из которых реагирует на определенные события:
- on_connected: Вызывается сразу после открытия соединения WebSocket. Обратите внимание, что это низкоуровневое событие и в случае Akenza это еще не означает, что вы можете отправлять или принимать сообщения.
- on_message_received: Основной метод, с которым вы большую часть времени работаете. Вызывается каждый раз, когда удаленный WebSocket отправляет или передает событие вашему сервису. С Akenza этот метод будет вызываться каждый раз, когда Akenza имеет что-то сообщить вам, например, что вы подписаны на сообщения и т.д.
- on_closed: Вызывается, когда WebSocket был закрыт. После закрытия WebSocket его использование становится невозможным.
Давайте сосредоточимся на методе on_message_received, в котором происходит основная часть действий. Он принимает онргумент ctx типа OnMessageReceived, который описывает контекст полученного сообщения. Сам запрос и обработчик WebSocket-соединения находятся в “ctx”, через который вы можете отвечать на сообщение.
Важными атрибутами объекта контекста являются:
- ctx.data: Словарь данных, которые Akenza отправляет вам.
- ctx.conn: Основное соединение WebSocket, через которое данные были отправлены, и через которое вы можете отправить ответ.
Теперь, логика с 30-й по 40-ю строку ясна:
- Сначала мы проверяем, подтвердил ли Akenza, что мы подключены (type==’connected’). Вам необходимо проверить тип сообщения каждый раз, когда Akenza что-то отправляет вам, и соответствующим образом на него реагировать.
- Затем, поскольку мы знаем, что мы уже подключены (например, наш API-ключ был действителен), мы можем подписаться на события от определенного IoT-актива. В тестовых целях идентификатор актива указывается непосредственно в исходном коде, но на практике эта информация будет считываться из файла конфигурации или базы данных.
- Наконец, для сообщений любого другого типа мы просто регистрируем их детали. Естественно, полная интеграция обрабатывала бы их в соответствии с требованиями в конкретных обстоятельствах, например, преобразуя и отправляя их в другие приложения или системы управления.
Пример сообщения от Akenza будет выглядеть следующим образом.
INFO - WebSocketClient - Сообщение от Akenza (кроме "connected") -> {'type': 'subscribed','replyTo': None, 'timeStamp': '2023-11-20T13:32:50.028Z','subscriptions': [{'assetId': 'abc123', 'topic': '*', 'tagId': None, 'valid': True}],'message': None}
Как отправить сообщения в WebSockets
Не стоит забывать о коммуникации в противоположном направлении, то есть о отправке сообщений в WebSockets. Например, у вас может быть вызваны службы через REST API или, возможно, из планировщика, и их задачей будет трансформировать такие вызовы в команды конфигурации для устройств IoT.
Вот основная часть такой службы, повторно использующая то же соединение WebSocket Akenza:
# -*- coding: utf-8 -*-# Zatofrom zato.server.service import Service# ############################################################################################### ##############################################################################################class DemoAkenzaSend(Service): # Наше имя name = 'demo.akenza.send' def handle(self) -> 'None': # Имя соединения для использования conn_name = 'Akenza' # Получить соединение .. with self.out.wsx[conn_name].conn.client() as client: # .. и отправить данные через него. client.send('Hello')# ############################################################################################### ##############################################################################################
Обратите внимание, что ответы на отправленные сообщения Akenza будут получены с использованием метода on_message_received вашей первой службы. Основанная на WebSockets механика обмена сообщениями по своей природе асинхронная, и каналы независимы.
Теперь у нас есть полное представление о реальном соединении IoT с Akenza и WebSockets. Мы можем устанавливать постоянные, отзывчивые соединения с активами, и мы можем подписываться на и отправлять сообщения устройствам, что позволяет нам создавать интеллектуальные архитектуры автоматизации и интеграции, использующие мощные, вновь появляющиеся технологии.
Leave a Reply