【安信可Wi-Fi&BLE模组】做一个蓝牙点阵音箱

小伙伴们,小安派AI语音开发板上线一段时间了,安信可社区“疯狂发板”模式后,不少伙伴已经拿到开发板了,来看看大家的实际体验如何~

以下作品由安信可社区用户

zzbinfo制作

开箱

wKgZO2iR1C2Afk2WAACvgARv-OM650.jpg

整个开发板组件装在防静电袋子中,很简洁。

打开防静电袋子,还有一个塑料袋子,袋子里面是个硬壳的盒子,开发组件就在盒子里面,防止运输过程中压坏开发板。

wKgZO2iR1C6AaYMyAACK5fDgTes148.jpg

pcb前后都垫有减震棉,包装确实很用心。我们来仔细看看开发板。

开发板pcb尺寸

wKgZO2iR1DGAZ4YfAADwu9kRyT4097.jpg

wKgZPGiR1DGACF0OAAB392jQzMY647.jpg

开发板35mm*30mm,尺寸小巧紧凑。很容易放置进合适的外壳中,对于我们业余DIY、开发测试非常友好。

主要芯片、模块

wKgZO2iR1DKAeoILAAB0Ad1TuHk435.jpg

主控模组就是安信可的Ai-WB2-12F WiFi模块,该模组搭载BL602芯片作为核心处理器,支持Wi-Fi 802.11b/g/n协议和BLE 5.0协议。

BL602芯片内置低功耗的32位RISC CPU,276KB RAM和丰富的外围接口,包括SDIOSPIUARTI2CIR remotePWMADCDACPIR 和 GPIO等。每个外接端子都有丝端子采用的是SH1.25。

wKgZPGiR1DOAaHtMAACnd5266HE270.jpg

背面丝印LTH7的是pw4054,单节锂电充电芯片,支持3.7V锂电池的充电,输入电压4.5-6.5V,充电电流由旁边的电阻控制,支持10mA-500mA。板载的电阻是5k,换成2k,可以支持到500mA。板子上没有放电控制电路,所以外接的锂电池要带充放电控制板。

离线语音识别采用的是上海华振电子的语音大脑VB6824芯片方案。

内核采用高性能 32 位 RISC 内核,主频 240MHz,支持硬件浮点运算,内置 1MB SPI FLASH。

该方案支持AI语音识别,采用最新的神经网络(TDNN)算法,具有识别精准,误判率低等优势,5米远场可靠识别。支持语音降噪,可以有效过滤掉稳态噪声、对动态噪声也有很好的抑制作用,噪音下也可准确识别。支持 MP3,WAV,WMA,APE,FLAC,AAC,MP4,M4A,AIF,AIFC音频解码。蓝牙支持 SBC,AAC 音频解码音频,支持 mSBC 语音编解码器。符合蓝牙 V5.1+BR+EDR+ BLE 规范,提供+6dbm 发射功率。

上电

wKgZO2iR1DOADJVyAABlecOxCHc094.jpg

接上type-c线后打开开关,随着红灯闪烁,语音提示清晰宏亮,根据语音提示和快速上手指南进行配网后就可以愉快地玩耍了。

小安AiPi-PalChatV1性能测试

配网测试

https://www.bilibili.com/video/BV16gTVzBEDr/?spm_id_from=888.80997.embed_other.whitelist&t=60.737005&bvid=BV16gTVzBEDr&vd_source=54c5db21948db2378659b7e8e42bafbf

从输入密码点击确定开始,到提示配网成功大概耗时8秒到10秒,这个跟网络环境也有很大关系。网络不好,会造成对话反应变慢,声音输出打嗝等现象。具体可以看一下测试视频

wKgZO2iR1DSABqOvAABC1NrXCQA483.jpg

打断测试

https://www.bilibili.com/video/BV1ruTVzuEUm/?spm_id_from=888.80997.embed_other.whitelist&t=70.125983&bvid=BV1ruTVzuEUm&vd_source=54c5db21948db2378659b7e8e42bafbf

由于采用的是VB6824语音识别方案,实现的对话中打断,效果还是不错的。可能是咪头和输出喇叭比较近的缘故,偶尔也会出现打断不了的情况,不过总体使用感觉还是很流畅。打断、识别的效果也和咪头有比较大的关系,我测试了两种手上有的咪头,效果相差很大。

wKgZO2iR1B6ACzK-AAOedQikc2s111.png

识别距离测试

https://www.bilibili.com/video/BV1eWTVzfEYn/?spm_id_from=333.788.player.player_end_recommend_autoplay&bvid=BV1ruTVzuEUm&vd_source=54c5db21948db2378659b7e8e42bafbf

再次提醒,识别距离和环境有很大关系,笔者测试的房间外面是狭长的走廊,在走廊里面有比较大的回音,经过芯片的消回音处理,识别的效果就大打折扣了,实际测试时也确实在房间内有很好的识别效果,走廊里面就完全识别不到语音了。

wKgZPGiR1DWAZpHSAABS7XClvAU740.jpg

AiPi-PalChatV1语音开发板通过MCP服务实现自动化控制家电

实现途径

有了AiPi-PalChatv1开发板和小智人工智能后台的加持,不实现点实用功能难免浪费,首先肯定是自动化控制家里的电器,通过查阅资料,有多种途径:

1、可以直接对开发板编程,定制自己的固件。

2、可以通过home assistant mcp server,这个需要有home assistant ,并且还要满足一定的要求。

3、自己写MCP server,小智后台提供了MCP接口和示例。

笔者采用第三种方式,把之前厨房的甲烷传感器,卫生间的热水器,空调,灯等能接入的全部实现了自动控制,效果非常惊喜。

硬件优化

wKgZO2iR1DWAfhCNAABvogf62l0669.jpg

从之前的测试和最近一段时间的摸索,发现语音识别依赖于采集的声音的质量,笔者把开发板上的两个开关电源芯片禁用了(也可以直接拆掉)。

通过外接的LDO实现的5V和3.3V的供电,实际使用效果良好,同样的咪头,唤醒距离增加一倍,对话流畅。(如下视频)

https://www.bilibili.com/video/BV1sxNoz8EKy/?spm_id_from=888.80997.embed_other.whitelist&t=13.901191&bvid=BV1sxNoz8EKy&vd_source=54c5db21948db2378659b7e8e42bafbf

wKgZPGiR1DaAdHYzAAAw8J3-61U753.jpg

MCP服务器代码

# -*- coding: utf-8 -*-

# 以下代码在2025年6月18日 python3.11环境下运行通过

import sys

sys.path.append('/usr/local/lib/python3.7/dist-packages')

import paho.mqtt.client as mqtt

#

import sys

sys.path.append('/usr/lib/python3/dist-packages')

import requests

import json

from mcp.server.fastmcp import FastMCP

import logging

import socket

import threading

import time

def connTCP(topic:str,msg:str):

global tcp_client_socket

# 创建socket

tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# IP 和端口

server_ip = 'bemfa.com'

server_port = 8344

uid = 'UUID'

try:

# 连接服务器

tcp_client_socket.connect((server_ip, server_port))

#发送订阅指令

substr = 'cmd=2&uid=' + uid + '&topic=' + topic + '&msg=' + msg + 'rn'

tcp_client_socket.send(substr.encode("utf-8"))

time.sleep(2)

tcp_client_socket.shutdown(2)

tcp_client_socket.close()

except:

time.sleep(2)

tcp_client_socket.shutdown(2)

tcp_client_socket.close()

#connTCP()

#Kitchen_temperature = ""

#Kitchen_CH4 = ""

def on_connect(client, userdata, flags, rc):

print("Connected with result code "+str(rc))

client.subscribe("****") # 订阅消息

#消息接收

def on_message(client, userdata, msg):

global Ktemperature

global KCH4

#print("主题:"+msg.topic+" 消息:"+str(msg.payload.decode('utf-8')))

if msg.topic == topicA: #判断topic是否是topicA

#print(" topicA msg")

msgstr = str(msg.payload.decode('utf-8'))

Ktemperature = msgstr.split("#")[-2]

KCH4 = msgstr.split("#")[-1]

#print(Kitchen_temperature)

#print(" topicA msg")

#print(Kitchen_CH4)

#if str(msg.payload.decode('utf-8')) == "ON": #如果接收字符on,亮灯

# connTCP('topic','ON') #开灯函数

#if str(msg.payload.decode('utf-8')) == "OFF": #如果接收字亮灯

# connTCP('topic','OFF') #关灯函数

#订阅成功

def on_subscribe(client, userdata, mid, granted_qos):

print("On Subscribed: qos = %d" % granted_qos)

# 失去连接

def on_disconnect(client, userdata, rc):

if rc != 0:

print("Unexpected disconnection %s" % rc)

logger = logging.getLogger('WHcontrol')

# Create an MCP server

mcp = FastMCP("WHcontrol")

# Add an addition tool

@mcp.tool()

def get_kitchen_temperature() -> dict:

"""用于查询厨房温度,返回温度值,单位摄氏度。"""

global Ktemperature

if float(Ktemperature) > 45:

Ktemperature = f'{float(Ktemperature)-18:.1f}' #修正温度值,由于电子温度计装的高,不修正,人工智能容易报警

result = Ktemperature

#print(result)

http://logger.info(f"result: {result}")

return {"success": True, "result": result}

@mcp.tool()

def get_kitchen_CH4() -> dict:

"""用于查询厨房天然气或是甲烷含量,返回天然气或是甲烷PPM值。"""

global KCH4

if int(KCH4) > 440:

KCH4 = f'{int(KCH4)-440}'

result = KCH4

# print(result)

http://logger.info(f"result: {result}")

return {"success": True, "result": result}

@mcp.tool()

def get_wh_timer_switch() -> dict:

"""用于查询热水器定时器开关的状态,on是打开,off是关闭。"""

headers={ "content-type": "application/x-www-form-urlencoded"}

urlmsg = f'https://apis.bemfa.com/va/getmsg?uid=UUID&topic=reshuiqi&type=3&num=1'

response = requests.get(url=urlmsg,headers=headers,timeout=5)

if response.status_code != 200:

raise ConnectionError(f'{url} status code is {response.status_code}.')

response = json.loads(response.content)

if 'data' not in response.keys():

raise ValueError(f'{url} miss key msg.')

response = json.dumps(response['data'][0])

response = json.loads(response)

if 'msg' not in response.keys():

raise ValueError(f'{url} miss key msg.')

result = response['msg'].split("#")[-1]

print(result)

http://logger.info(f"result: {result}")

return {"success": True, "result": result}

@mcp.tool()

def set_wh_timer_switch(sw:str) -> dict:

"""设置热水器定时加热开关状态,on是打开,off是关闭。打开后在每天晚上20点自动开始加热,一个小时后关闭加热开关。"""

if sw == "on":

connTCP("topic","G2on")

if sw == "off":

connTCP("topic","G2off")

result = sw

http://logger.info(f"result: {result}")

return {"success": True, "result": result}

@mcp.tool()

def get_wh_switch() -> dict:

"""用于查询热水器加热开关的状态,on是打开,off是关闭。"""

headers={ "content-type": "application/x-www-form-urlencoded"}

urlmsg = f'https://apis.bemfa.com/va/getmsg?uid=UUID&topic=reshuiqi&type=3&num=1'

response = requests.get(url=urlmsg,headers=headers,timeout=5)

if response.status_code != 200:

raise ConnectionError(f'{url} status code is {response.status_code}.')

response = json.loads(response.content)

if 'data' not in response.keys():

raise ValueError(f'{url} miss key msg.')

response = json.dumps(response['data'][0])

response = json.loads(response)

if 'msg' not in response.keys():

raise ValueError(f'{url} miss key msg.')

result = response['msg'].split("#")[-2]

print(result)

http://logger.info(f"result: {result}")

return {"success": True, "result": result}

@mcp.tool()

def set_wh_switch(sw:str) -> dict:

"""设置热水器加热开关状态,on是打开热水器加热开关,off是关闭热水器加热开关。"""

if sw == "on":

connTCP("topic","G1on")

if sw == "off":

connTCP("topic","G1off")

result = sw

http://logger.info(f"result: {result}")

return {"success": True, "result": result}

@mcp.tool()

def set_light_switch(sw:str) -> dict:

"""设置客厅照明灯的开关状态,on是打开照明灯,off是关闭照明灯。"""

if sw == "on":

connTCP("topic","A857A4AC6C") #我是通过433网关来发的控制指令,这个是我的灯的指令,修改成你们自己的

if sw == "off":

connTCP("topic","A857A4AC69")

result = sw

http://logger.info(f"result: {result}")

return {"success": True, "result": result}

@mcp.tool()

def set_light_brightness(sw:str) -> dict:

"""设置客厅照明灯的亮度,add是增加亮度,fall是减小亮度。"""

if sw == "fall":

connTCP("topic","A857A4AC64")

if sw == "add":

connTCP("topic","A857A4AC68")

result = sw

http://logger.info(f"result: {result}")

return {"success": True, "result": result}

@mcp.tool()

def set_air_switch(sw:str) -> dict:

"""设置卧室空调的开关状态,on是打开空调,off是关闭空调。"""

if sw == "on":

connTCP("topic","H823CB2602002403053D000000007F") #我是通过红外网关来发的控制指令,这个是我的空调的指令,修改成你们自己的

if sw == "off":

connTCP("topic","H823CB26020020030F3A0000000082")

result = sw

http://logger.info(f"result: {result}")

return {"success": True, "result": result}

# Start the server

if __name__ == "__main__":

MQTTHOST = "http://bemfa.com"

MQTTPORT = 9501

client_id = "UUID"

topicA = "tipoc"

client = mqtt.Client(client_id)

client.username_pw_set("userName", "passwd")

client.on_connect = on_connect

client.on_message = on_message

client.on_subscribe = on_subscribe

client.on_disconnect = on_disconnect

client.connect(MQTTHOST, MQTTPORT, 60)

client.loop_start()

time.sleep(3)

mcp.run(transport="stdio")

笔者的设备都是通过bemfa云控制的,具体接入文档可以参照巴法云的说明。

MCP server运行在一个树莓派上,先按照小智MCP示例安装运行环境,然后直接把小智MCP接入点的字符串写到mcp_pipe.py文件中,省的设置环境变量。通过命令启动MCP server :python mcp_pipe.py switch-py.py。

在小智后台 --》配置角色 最下面的MCP接入点中就应该能看到已经接入的MCP服务了。

wKgZO2iR1DaAOCRwAAB1MkDuEbI637.jpg

最后代码

里面的一些变量需要自己修改,代码写的比较乱,大家凑合看。

附件:mcp代码.rar​bbs.aithinker.com/forum.php?mod=attachment&aid=34182

审核编辑 黄宇