【问题描述】
-
介绍问题现象和发生的背景
websocket 连接正常,客户端可以向服务端发送数据,但是接收不了服务端的数据。
-
相关的代码(请勿使用截图)
// import webSocket from '@ohos.net.webSocket';
import Logger from '../model/logger';
import webSocket from '@ohos.net.webSocket'; // 引入 OpenHarmony 的 WebSocket 模块
// import Logger from '../util/logger'; // 引入日志工具
const TAG = "WebSocketDemo"; // 定义日志标签
export class SocketService {
private socket: webSocket.WebSocket; // 声明 WebSocket 对象
private receivedData: Object | null = null; // 存储接收到的数据
// private receivedData: Object | null = null; // 存储接收到的数据
constructor() {
this.createWebSocket(); // 创建 WebSocket
this.onOpen(); // 注册连接成功的事件
this.onMessage(); // 注册消息接收的事件
this.onClose(); // 注册连接关闭的事件
this.onError(); // 注册错误事件
}
/**
* 创建 WebSocket 对象
*/
private createWebSocket() {
Logger.info(TAG, "start create websocket"); // 日志记录开始创建 WebSocket
this.socket = webSocket.createWebSocket(); // 创建 WebSocket 实例
Logger.info(TAG, "create socket success"); // 日志记录创建成功
}
/**
* 根据指定的 URL 与服务器建立连接
* @param url - WebSocket 服务器的地址
*/
public connect(url: string) {
this.socket.connect(url) // 连接到 WebSocket 服务器
.then((isConnect) => {
if (isConnect) {
Logger.info(TAG, "connect success"); // 日志记录连接成功
}
});
}
/**
* 发送消息到服务器
* @param data - 要发送的数据
*/
public sendMessage(data: string) {
this.socket.send(data) // 发送数据
.then((isSuccess) => {
Logger.info(TAG, "send result: " + isSuccess); // 日志记录发送结果
if (isSuccess) {
Logger.info(TAG, "send message success"); // 日志记录发送成功
}
})
.catch((e) => {
Logger.info(TAG, "send message fail: " + JSON.stringify(e)); // 日志记录发送失败
});
}
/**
* 关闭与服务器的连接
*/
public closeConnect() {
this.socket.close(); // 关闭 WebSocket 连接
}
/**
* 监听连接打开事件
*/
private onOpen() {
this.socket.on("open", (err, data) => {
Logger.info(TAG, "on open status: " + JSON.stringify(data)); // 日志记录连接打开的状态
});
}
/**
* 监听服务器发送的消息
*/
private onMessage() {
this.socket.on("message", (err, data) => {
if (err) {
Logger.error(TAG, "Error receiving message: " + JSON.stringify(err));
return;
}
Logger.info(TAG, "receive from server: message is: " + JSON.stringify(data)); // 日志记录接收到的消息
console.log('Received raw data:', data);
this.receivedData = data; // 存储接收到的数据
try {
this.processData(data); // 处理接收到的数据
} catch (e) {
Logger.error(TAG, "Error processing message: " + e.message);
}
});
}
/**
* 处理接收到的数据
* @param data - 接收到的数据
*/
private processData(data: Object) {
if (data === undefined || data === null) {
Logger.error(TAG, "Received undefined or null data.");
return;
}
if (typeof data === 'string') {
Logger.info(TAG, "Processing received string data: " + data);
// 处理字符串数据
console.log('Received string:', data);
} else if (typeof data === 'object') {
Logger.info(TAG, "Processing received object data: " + JSON.stringify(data));
// 处理对象数据
console.log('Received object:', data);
} else {
Logger.error(TAG, "Received unsupported data type.");
throw new Error("Received unsupported data type.");
}
}
/**
* 监听连接关闭事件
*/
private onClose() {
this.socket.on("close", (err, data) => {
Logger.info(TAG, "on close: close code is: " + data.code + ", close reason is: " + data.reason); // 日志记录连接关闭的信息
});
}
/**
* 监听错误事件
*/
private onError() {
this.socket.on("error", (data) => {
Logger.info(TAG, "on error: err is: " + JSON.stringify(data)); // 日志记录错误信息
});
}
}
// 创建 SocketService 的实例
let socketService = new SocketService();
// 导出 SocketService 实例
export default socketService;
服务端代码:
-
运行结果、错误截图
-
我尝试过的解决方法和结果
-
我想要达到的结果
import asyncio
import websockets
import rospy
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Pose2D
from kobuki_msgs.msg import BatteryInfo
import json
import subprocess
import os
import signal
import threading
class WebSocketServer:
def init(self, host, port):
rospy.init_node("websocket_server")
self.json_data = ""
self.battery_data = ""
self.host = host
self.port = port
try:
port_pid = subprocess.check_output(["lsof", "-i", f":{self.port}"]).decode().split()[10]
print(f"端口{self.port}已被占用,PID为{port_pid}")
os.kill(int(port_pid), signal.SIGKILL)
except subprocess.CalledProcessError:
pass
rospy.loginfo("WebSocket服务器初始化成功!")
rospy.Subscriber("/pose_data", Pose2D, self.pose_callback)
rospy.Subscriber("/mobile_base/debug/battery_info", BatteryInfo, self.battery_callback)
self.websocket = None # 存储当前连接的WebSocket
def battery_callback(self, msg):
self.battery_data = msg.battery_voltage
def pose_callback(self, msg):
self.json_data = "get_pose"+","+str(msg.x)+","+str(msg.y)+","+str(msg.theta)
async def send_message(self):
while self.websocket is not None:
await asyncio.sleep(5)
# message = str({"event" : "test","x": 0,"y": 0,"z": 0,"battery": self.battery_data})
message = 'x:1.0,y:2.0,z:3.0,battery'
print(f"发送消息: {message}")
try:
await self.websocket.send(message)
print(f"发送消息: {message}")
except websockets.ConnectionClosed:
print("客户端连接已关闭,无法发送消息。")
break
async def echo(self, websocket, path):
self.websocket = websocket
client_address = websocket.remote_address
print(f"客户端连接: {client_address}")
await websocket.send("ok")
# 启动定时发送消息的协程
# asyncio.create_task(self.send_message())
try:
async for message in websocket:
print(f"收到消息: {message} 从客户端: {client_address}")
data = b"ok" # 字节串
text_data = data.decode('utf-8') # 解码成字符串
await websocket.send(json.dumps(text_data))
print(f"发送消息: ok")
except websockets.ConnectionClosed:
print(f"客户端 {client_address} 已关闭连接。")
finally:
self.websocket = None # 清空WebSocket引用
async def main(self):
async with websockets.serve(self.echo, self.host, self.port):
print("服务器启动成功!")
await asyncio.Future()
def run(self):
try:
asyncio.run(self.main())
except KeyboardInterrupt:
print("服务器已关闭!")
asyncio.get_event_loop().close()
【运行环境】
硬件:鲁班猫
ROM版本:
DevEvoStudio版本:DevEco Studio 3.1.1 Release
SDK版本:API9 |