import shebang
#! /usr/bin/env python3
import necessary libraries
import os
import rospy
from sensor_msgs.msg import Image
from sensor_msgs.msg import PointCloud2, PointField
from std_msgs.msg import Header
from std_msgs.msg import Int32
from std_msgs.msg import Float32
from geometry_msgs.msg import Vector3
from std_msgs.msg import String
from cv_bridge import CvBridge
import matplotlib.pyplot as plt
import cv2
from ultralytics import YOLO
import cv2
import time
from ultralytics.utils.plotting import Annotator
import numpy as np
from ultralytics.nn.autobackend import AutoBackend
from ultralytics.utils.ops import non_max_suppression, scale_boxes
# from ultralytics.yolo.data.dataloaders.stream_loaders import LoadImages, LoadStreams
from ultralytics.utils.plotting import Annotator, colors
import torch
from ultralytics.utils.ops import scale_image
To run on GPU
torch.cuda.set_device(0)
Load the model
# Load a model
# model = YOLO("yolov8n.yaml") # YOLO provided model
model = YOLO("weights/yolov8s-seg.pt") # load a pretrained model (recommended for training)
Function to return masked inference
def mask_results_yolo(results,imr):
if len(results)>0:
for result in results:
# get array results
masks = result.masks.masks
boxes = result.boxes.boxes
# extract classes
clss = boxes[:, 5]
# get indices of results where class is 0 (people in COCO)
people_indices = torch.where(clss == 2)
# use these indices to extract the relevant masks
people_masks = masks[people_indices]
# scale for visualizing results
people_mask = torch.any(people_masks, dim=0).int() * 255
mask = np.uint8(people_mask.cpu().numpy())
final_mask = scale_image(np.uint8(mask), imr.shape)
mask = cv2.threshold(final_mask, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]
binary_mask = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)
alpha = 0.5 # Adjust the alpha value to control the transparency of the mask
final_img = cv2.addWeighted(imr, 1 - alpha, binary_mask, alpha, 0)
###Uncomment for visualization
# cv2.namedWindow("asd", cv2.WINDOW_NORMAL)
# cv2.imshow('asd',overlay)
# cv2.waitKey(0)
# cv2.destroyAllWindows()
return final_img
Initiate a class, and define the subscriber and publisher
class counter():
def __init__(self):
self.model = YOLO("/home/achyut/Achyut/Projects/ResearchWorks/trunk_detection/apple_orchard/code/runs/detect/train2/weights/best.pt")
self.scale_percent = 100
self.queue=1
self.bridge = CvBridge()
self.scale_percent = 100 # percent of original size
self.sub_image = rospy.Subscriber("/camera/color/image_raw", Image, self.callback_image, queue_size=1, buff_size=2**24) #/zed2/zed_node/left/image_rect_color
self.pub_img = rospy.Publisher('trunk',Image,queue_size = self.queue)
def callback_image(self,data):
self.imagecv = self.bridge.imgmsg_to_cv2(data, desired_encoding='passthrough')
self.imagecv = cv2.cvtColor(self.imagecv , cv2.COLOR_RGB2BGR)
img = self.imagecv
half = False
img_sz = [640, 640]
width = 640 #int(img.shape[0] * self.scale_percent / 100) #384
height = 360# int(img.shape[1] * self.scale_percent / 100) #640
dim = (width, height)
# resize image
resized = cv2.resize(img, dim, interpolation = cv2.INTER_AREA)
imr = resized
results = self.model.predict(imr,device='0')
classes = results[0].boxes.cls.cpu().numpy()
names = self.model.names
k = 0
if len(results[0].boxes.xyxy) > 0:
for i in range(len(results[0].boxes.xyxy)):
annotator = Annotator(imr)
b = results[0].boxes[i].xyxy.cpu().numpy().flatten() # get box coordinates in (top, left, bottom, right) format
c = int(results[0].boxes[i].cls)
cls = [c]
annotator.box_label(b,f'{names[c]}', color=colors(c))
final_img = annotator.result()
##TO VISUALIZE
# cv2.namedWindow("asd", cv2.WINDOW_NORMAL)
# cv2.imshow('asd',imr)
# cv2.waitKey(0)
# cv2.destroyAllWindows()
else:
final_img = imr
image_message = self.bridge.cv2_to_imgmsg(final_img, "passthrough")
self.pub_img.publish(image_message)
def main():
'''Initializes and cleanup ros node'''
rospy.init_node('trunk_detection_node')
c = counter()
# c.run()
try:
# c = counter()
# c.run()
rospy.spin()
except KeyboardInterrupt:
print ("Shutting down ROS Image feature detector module")
if __name__ == "__main__":
main()
Details of code above:
Topic subscribed /camera/color/image_raw
Topic published /trunk
define a subscriber with message type Image and callback function self.callback_image with queue size of 1 and large buffer size to make sure the whole image can stay.
self.sub_image = rospy.Subscriber("/camera/color/image_raw", Image, self.callback_image, queue_size=1, buff_size=2**24)
The main processing is in callback function.
Run inference on the subscribed image results = self.model.predict(imr,device='0')