OpenHarmony开发者论坛

标题: websocket 连接正常,客户端可以向服务端发送数据,但是接收不了服务端的数据。求助,大佬帮忙解决。 [打印本页]

作者: 方小生    时间: 2024-10-10 01:02
标题: websocket 连接正常,客户端可以向服务端发送数据,但是接收不了服务端的数据。求助,大佬帮忙解决。
[md]### 【问题描述】

1. 介绍问题现象和发生的背景
   websocket 连接正常,客户端可以向服务端发送数据,但是接收不了服务端的数据。
2. 相关的代码(请勿使用截图)

   ```
   // import webSocket from '@ohos.net.webSocket';
   import Logger from '../model/logger';
   import webSocket from '@ohos.net.webSocket';  // 引入 OpenHarmony 的 WebSocket 模块
   // import Logger from '../util/logger';  // 引入日志工具
   const TAG = "WebSocketDemo";  // 定义日志标签

   export class SocketService {
     private socket: webSocket.WebSocket;  // 声明 WebSocket 对象
     private receivedData: Object | null = null;  // 存储接收到的数据
     // private receivedData: Object | null = null;  // 存储接收到的数据

     constructor() {
       this.createWebSocket();  // 创建 WebSocket
       this.onOpen();  // 注册连接成功的事件
       this.onMessage();  // 注册消息接收的事件
       this.onClose();  // 注册连接关闭的事件
       this.onError();  // 注册错误事件
     }

     /**
      * 创建 WebSocket 对象
      */
     private createWebSocket() {
       Logger.info(TAG, "start create websocket");  // 日志记录开始创建 WebSocket
       this.socket = webSocket.createWebSocket();  // 创建 WebSocket 实例
       Logger.info(TAG, "create socket success");  // 日志记录创建成功
     }

     /**
      * 根据指定的 URL 与服务器建立连接
      * @param url - WebSocket 服务器的地址
      */
     public connect(url: string) {
       this.socket.connect(url)  // 连接到 WebSocket 服务器
         .then((isConnect) => {
           if (isConnect) {
             Logger.info(TAG, "connect success");  // 日志记录连接成功
           }
         });
     }

     /**
      * 发送消息到服务器
      * @param data - 要发送的数据
      */
     public sendMessage(data: string) {
       this.socket.send(data)  // 发送数据
         .then((isSuccess) => {
           Logger.info(TAG, "send result: " + isSuccess);  // 日志记录发送结果
           if (isSuccess) {
             Logger.info(TAG, "send message success");  // 日志记录发送成功
           }
         })
         .catch((e) => {
           Logger.info(TAG, "send message fail: " + JSON.stringify(e));  // 日志记录发送失败
         });
     }

     /**
      * 关闭与服务器的连接
      */
     public closeConnect() {
       this.socket.close();  // 关闭 WebSocket 连接
     }

     /**
      * 监听连接打开事件
      */
     private onOpen() {
       this.socket.on("open", (err, data) => {
         Logger.info(TAG, "on open status: " + JSON.stringify(data));  // 日志记录连接打开的状态
       });
     }

     /**
      * 监听服务器发送的消息
      */
     private onMessage() {
       this.socket.on("message", (err, data) => {
         if (err) {
           Logger.error(TAG, "Error receiving message: " + JSON.stringify(err));
           return;
         }

         Logger.info(TAG, "receive from server: message is: " + JSON.stringify(data));  // 日志记录接收到的消息
         console.log('Received raw data:', data);

         this.receivedData = data;  // 存储接收到的数据
         try {
           this.processData(data);  // 处理接收到的数据
         } catch (e) {
           Logger.error(TAG, "Error processing message: " + e.message);
         }
       });
     }

     /**
      * 处理接收到的数据
      * @param data - 接收到的数据
      */
     private processData(data: Object) {
       if (data === undefined || data === null) {
         Logger.error(TAG, "Received undefined or null data.");
         return;
       }

       if (typeof data === 'string') {
         Logger.info(TAG, "Processing received string data: " + data);
         // 处理字符串数据
         console.log('Received string:', data);
       } else if (typeof data === 'object') {
         Logger.info(TAG, "Processing received object data: " + JSON.stringify(data));
         // 处理对象数据
         console.log('Received object:', data);
       } else {
         Logger.error(TAG, "Received unsupported data type.");
         throw new Error("Received unsupported data type.");
       }
     }

     /**
      * 监听连接关闭事件
      */
     private onClose() {
       this.socket.on("close", (err, data) => {
         Logger.info(TAG, "on close: close code is: " + data.code + ", close reason is: " + data.reason);  // 日志记录连接关闭的信息
       });
     }

     /**
      * 监听错误事件
      */
     private onError() {
       this.socket.on("error", (data) => {
         Logger.info(TAG, "on error: err is: " + JSON.stringify(data));  // 日志记录错误信息
       });
     }
   }

   // 创建 SocketService 的实例
   let socketService = new SocketService();

   // 导出 SocketService 实例
   export default socketService;
   ```

   服务端代码:
5. 运行结果、错误截图
6. 我尝试过的解决方法和结果
7. 我想要达到的结果

import asyncio
import websockets
import rospy
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Pose2D
from kobuki_msgs.msg import BatteryInfo
import json
import subprocess
import os
import signal
import threading

class WebSocketServer:
def __init__(self, host, port):
rospy.init_node("websocket_server")
self.json_data = ""
self.battery_data = ""
self.host = host
self.port = port
try:
port_pid = subprocess.check_output(["lsof", "-i", f":{self.port}"]).decode().split()[10]
print(f"端口{self.port}已被占用,PID为{port_pid}")
os.kill(int(port_pid), signal.SIGKILL)
except subprocess.CalledProcessError:
pass
rospy.loginfo("WebSocket服务器初始化成功!")
rospy.Subscriber("/pose_data", Pose2D, self.pose_callback)
rospy.Subscriber("/mobile_base/debug/battery_info", BatteryInfo, self.battery_callback)

```
    self.websocket = None  # 存储当前连接的WebSocket

def battery_callback(self, msg):
    self.battery_data = msg.battery_voltage

def pose_callback(self, msg):
    self.json_data = "get_pose"+","+str(msg.x)+","+str(msg.y)+","+str(msg.theta)


async def send_message(self):
    while self.websocket is not None:
        await asyncio.sleep(5)
        # message = str({"event" : "test","x": 0,"y": 0,"z": 0,"battery": self.battery_data})
        message = 'x:1.0,y:2.0,z:3.0,battery'
        print(f"发送消息: {message}")
        try:
            await self.websocket.send(message)
            print(f"发送消息: {message}")
        except websockets.ConnectionClosed:
            print("客户端连接已关闭,无法发送消息。")
            break

async def echo(self, websocket, path):
    self.websocket = websocket
    client_address = websocket.remote_address
    print(f"客户端连接: {client_address}")
    await websocket.send("ok")

    # 启动定时发送消息的协程
    # asyncio.create_task(self.send_message())

    try:
        async for message in websocket:
            print(f"收到消息: {message} 从客户端: {client_address}")
        

            data = b"ok"  # 字节串
            text_data = data.decode('utf-8')  # 解码成字符串
            await websocket.send(json.dumps(text_data))
            print(f"发送消息: ok")
            

    except websockets.ConnectionClosed:
        print(f"客户端 {client_address} 已关闭连接。")
    finally:
        self.websocket = None  # 清空WebSocket引用

async def main(self):
    async with websockets.serve(self.echo, self.host, self.port):
        print("服务器启动成功!")
        await asyncio.Future()

def run(self):
    try:
        asyncio.run(self.main())
    except KeyboardInterrupt:
        print("服务器已关闭!")
        asyncio.get_event_loop().close()

```


### 【运行环境】

硬件:鲁班猫
ROM版本:
DevEvoStudio版本:DevEco Studio 3.1.1 Release
SDK版本:API9
[/md]
作者: fengyunrenwu    时间: 6 天前
可以参考一下哈:
import { webSocket } from '@kit.NetworkKit';
import { BusinessError } from '@kit.BasicServicesKit';
import util from '@ohos.util';

async function TestWebSocketStringWs() {
const web = webSocket.createWebSocket();
web.on('message', (err: BusinessError, data: string | ArrayBuffer) => {
if (err) {
console.error('TestWebSocket ' + JSON.stringify(err));
}
if (typeof data === 'string') {
console.info('TestWebSocket get string: ' + data);
} else {
const view = new Uint8Array(data);
const res = util.TextDecoder.create('utf-8').decodeWithStream(view);
console.info('TestWebSocket get ArrayBuffer: ' + res);
}
});
web.on('open', () => {
const sendData = 'Hello World';
web.send(sendData);
});
await web.connect('ws://192.168.0.167:8888/string');
}

async function TestWebSocketStringWss() {
const web = webSocket.createWebSocket();
web.on('message', (err: BusinessError, data: string | ArrayBuffer) => {
if (err) {
console.error('TestWebSocket ' + JSON.stringify(err));
}
if (typeof data === 'string') {
console.info('TestWebSocket get string: ' + data);
} else {
const view = new Uint8Array(data);
const res = util.TextDecoder.create('utf-8').decodeWithStream(view);
console.info('TestWebSocket get ArrayBuffer: ' + res);
}
});
web.on('open', () => {
const sendData = 'Hello World';
const buffer = new ArrayBuffer(sendData.length);
util.TextEncoder.create('utf-8').encodeIntoUint8Array(sendData, new Uint8Array(buffer));
web.send(buffer);
});
await web.connect('ws://192.168.0.167:8888/binary');
}
@Entry
@Component
struct Index {
@State message: string = 'Hello World';
  build() {
     Row() {
       Column() {
           Text('TestWebSocketStringWs')
     .fontSize(50)
     .fontWeight(FontWeight.Bold)
     .onClick(TestWebSocketStringWs)
  Text('TestWebSocketStringWs')
    .fontSize(50)
    .fontWeight(FontWeight.Bold)
    .onClick(TestWebSocketStringWss)
}
    .width('100%')
}
     .height('100%')
}
}
作者: 方小生    时间: 6 天前
解决了,是我代码问题




欢迎光临 OpenHarmony开发者论坛 (https://forums.openharmony.cn/) Powered by Discuz! X3.5