graspnet+Astra2相机实现部署

环境配置

  • ubuntu 20.04
  • Astra2相机
  • cuda 11.0.1
  • cudnn v8.9.7
  • python 3.8.19
  • pytorch 1.7.0
  • numpy 1.23.5

1. graspnet的复现

具体的复现流程可以参考这篇文章:Ubuntu20.04下GraspNet复现流程

这里就不再详细介绍了

2. Astra2的Python API

以下内容都是参考官方文档:Orbbec SDK for Python 使用手册

我们首先确认输入到网络中的数据为一个点云数据,再一个我们需要一个rgb图像用来给点云上色,graspnetAPI帮我们写好了从深度图转换到点云的函数create_point_cloud_from_depth_image,所以我们只需要写好从相机的视频流获取深度图片和rgb图片的部分就好了,特别注意的是,大多数相机的rgb的fov是要大于深度图的fov所以我们要对两者进行对齐操作,对齐操作的本质就是在深度图中填充0,使得深度图和rgb图的大小一致。大多数的相机厂商已经提供了具体的示例来演示如何进行对齐,这里就不再赘述。

这里我直接给出我写的astra2.py,用于获取相机的深度图和rgb图的代码,大家可以参考一下思路

astra2.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
from pyorbbecsdk import *
import numpy as np
import cv2
import os
import open3d as o3d

class Camera:
def __init__(self, width=1280, height=720,fps=15):
self.im_width = width
self.im_height = height
self.fps = fps
self.intrinsic = None
self.scale = None

# 连接相机
# self.connect()

def connect(self):
"""用于连接相机"""
self.pipeline = Pipeline()
config = Config()

# color config
profile_list = self.pipeline.get_stream_profile_list(OBSensorType.COLOR_SENSOR)
color_profile = profile_list.get_default_video_stream_profile()
config.enable_stream(color_profile)
# depth config
profile_list = self.pipeline.get_stream_profile_list(OBSensorType.DEPTH_SENSOR)
assert profile_list is not None
depth_profile = profile_list.get_default_video_stream_profile()
assert depth_profile is not None
print("color profile : {}x{}@{}_{}".format(color_profile.get_width(),
color_profile.get_height(),
color_profile.get_fps(),
color_profile.get_format()))
print("depth profile : {}x{}@{}_{}".format(depth_profile.get_width(),
depth_profile.get_height(),
depth_profile.get_fps(),
depth_profile.get_format()))
config.enable_stream(depth_profile)
# set synchronize for depth img and color img
config.set_align_mode(OBAlignMode.SW_MODE)
self.pipeline.enable_frame_sync()
# start config
self.pipeline.start(config)

# get intrinsic
self.intrinsic = self.get_intrinsic()

def disconnect(self):
"""用于断开相机"""
self.pipeline.stop()


def get_frame(self):
"""通过流来获取color frame和depth frame"""
while True:
frames: FrameSet = self.pipeline.wait_for_frames(200)
if frames is None:
continue
color_frame = frames.get_color_frame()
if color_frame is None:
continue
depth_frame = frames.get_depth_frame()
if depth_frame is None:
continue
if color_frame != None and depth_frame != None:
break

return color_frame, depth_frame

def frame2data(self, color_frame, depth_frame):
"""暂时没用"""
width = depth_frame.get_width()
height = depth_frame.get_height()
scale = depth_frame.get_depth_scale()
depth_data = np.frombuffer(depth_frame.get_data(), dtype=np.uint16)
depth_data = depth_data.reshape((height, width))

width = color_frame.get_width()
height = color_frame.get_height()
color_data = np.asanyarray(color_frame.get_data(), dtype=np.uint16)
# color_data = color_data.reshape((height, width, 3))
return color_data.astype(np.float32), depth_data.astype(np.float32)

def get_data(self):
"""通过流来获取color data和depth data"""

# 连接相机
self.connect()
color_frame, depth_frame = self.get_frame()

width = color_frame.get_width()
height = color_frame.get_height()
color_format = color_frame.get_format()
data = np.asanyarray(color_frame.get_data())
color_data = cv2.imdecode(data, cv2.IMREAD_COLOR)
color_data.astype(np.float32)

print('color_image.shape: ', color_data.shape)
# print("===width: {}===".format(width))
# print("===height: {}===".format(height))
width = depth_frame.get_width()
height = depth_frame.get_height()
scale = depth_frame.get_depth_scale()
print("===width: {}===".format(width))
print("===height: {}===".format(height))
print("===scale: {}===".format(scale))
save_dir = os.path.join(os.getcwd(), "real/intrinsic")
if not os.path.exists(save_dir):
os.mkdir(save_dir)
filename = save_dir + "/camera_depth_scale.txt"
save = np.array([scale])
np.savetxt(filename, save, delimiter=' ')

depth_data = np.frombuffer(depth_frame.get_data(), dtype=np.uint16)
depth_data = depth_data.reshape((height, width))
depth_data = depth_data.astype(np.float32) * scale
print('depth_image.shape: ', depth_data.shape)

depth_data = cv2.normalize(depth_data, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_16U)

# 断开相机
self.disconnect()

return color_data, depth_data

def get_data_saved(self):
color_data, depth_data = self.get_data()

# depth_image = depth_data_normalized.astype(np.uint8)

save_image_dir = os.path.join(os.getcwd(), "real/images")
if not os.path.exists(save_image_dir):
os.mkdir(save_image_dir)
depth_filename = save_image_dir + "/depth_{}x{}.png".format(depth_data.shape[0], depth_data.shape[1])
color_filename = save_image_dir + "/color_{}x{}.png".format(color_data.shape[0], color_data.shape[1])
cv2.imwrite(color_filename, color_data)
# depth_data.tofile(depth_filename)
cv2.imwrite(depth_filename, depth_data)

return color_data, depth_data


def get_intrinsic(self):
"""获取内参"""
# get intrinsic
itsc = self.pipeline.get_camera_param()
raw_intrinsic = itsc.depth_intrinsic

intrinsic = np.array([raw_intrinsic.fx, 0, raw_intrinsic.cx,
0, raw_intrinsic.fy, raw_intrinsic.cy,
0, 0, 1]).reshape(3,3)
print("intrinsic: ", itsc)
print('depth intrinsic: ', raw_intrinsic)
print("intrinsic matrix", intrinsic)
save_dir = os.path.join(os.getcwd(), "real/intrinsic")
if not os.path.exists(save_dir):
os.mkdir(save_dir)
filename = save_dir + "/camera_itcs.txt"

np.savetxt(filename, intrinsic, delimiter=' ')

return intrinsic

# for test
def visualize(self):
"""显示rgbd图像"""
color_data, depth_data = self.get_data()
depth_image = cv2.normalize(depth_data, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)
depth_image = cv2.applyColorMap(depth_image, cv2.COLORMAP_JET)
# overlay color image on depth image
depth_image = cv2.addWeighted(color_data, 0.5, depth_image, 0.5, 0)
cv2.imshow("Depth with Color", depth_image)
cv2.waitKey(500)



if __name__ == '__main__':
camera = Camera()
camera.visualize()
color, depth = camera.get_data()
print("depth.shape: ", depth.shape)

测试相机是否实现对齐,结果如下

img

表明了相机确实实现了对齐操作

3. 修改demo.py

我们使用get_data()函数就能够让相机进行一次拍摄,然后得到color_datadepth_data供我们进行后续的处理。然后可以修改一下demo.py,将从文件读取数据改为直接从相机进行读取

demo.py

1
2
3
4
5
6
7
8
9
10
11
12
...
from astra2 import Camera()
astra2 = Camera()

...

def get_and_process_data():

# 使用相机获取一次数据
color, depth = astra2.get_data()
color = color / 255.0
...

然后别的部分可以保持不变或者以后再修改。这里一定要使用官方提供的checkpoint-rs.tar,如果使用checkpoint-kn.tar,会出现异常,暂时我也没有找到原因。

最后处理的效果如下

img

可以看到出现了许多我们不希望存在的抓取框,这个可以通过调整workspace_mask来进行过滤