import shebang

#! /usr/bin/env python3

import necessary libraries

import os
import rospy
from sensor_msgs.msg import Image
from sensor_msgs.msg import PointCloud2, PointField
from std_msgs.msg import Header
from std_msgs.msg import Int32
from std_msgs.msg import Float32
from geometry_msgs.msg import Vector3
from std_msgs.msg import String
from cv_bridge import CvBridge
import matplotlib.pyplot as plt
import cv2 
from ultralytics import YOLO
import cv2
import time
from ultralytics.utils.plotting import Annotator
import numpy as np
from ultralytics.nn.autobackend import AutoBackend
from ultralytics.utils.ops import non_max_suppression, scale_boxes
# from ultralytics.yolo.data.dataloaders.stream_loaders import LoadImages, LoadStreams
from ultralytics.utils.plotting import Annotator, colors
import torch
from ultralytics.utils.ops import scale_image

To run on GPU

torch.cuda.set_device(0)

Load the model

# Load a model
# model = YOLO("yolov8n.yaml")  # YOLO provided model
model = YOLO("weights/yolov8s-seg.pt")  # load a pretrained model (recommended for training)

Function to return masked inference

def mask_results_yolo(results,imr):
    if len(results)>0:
        for result in results:
            # get array results
            masks = result.masks.masks
            boxes = result.boxes.boxes
            # extract classes
            clss = boxes[:, 5]
            # get indices of results where class is 0 (people in COCO)
            people_indices = torch.where(clss == 2)
            # use these indices to extract the relevant masks
            people_masks = masks[people_indices]
            # scale for visualizing results
            people_mask = torch.any(people_masks, dim=0).int() * 255
            mask = np.uint8(people_mask.cpu().numpy())

            final_mask = scale_image(np.uint8(mask), imr.shape) 

            mask = cv2.threshold(final_mask, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]
            binary_mask = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)
            alpha = 0.5  # Adjust the alpha value to control the transparency of the mask
            final_img = cv2.addWeighted(imr, 1 - alpha, binary_mask, alpha, 0)
				
				###Uncomment for visualization
        # cv2.namedWindow("asd", cv2.WINDOW_NORMAL)  
        # cv2.imshow('asd',overlay)
        # cv2.waitKey(0)
        # cv2.destroyAllWindows()

        return final_img

Initiate a class, and define the subscriber and publisher

class counter():

    def __init__(self):
        self.model = YOLO("/home/achyut/Achyut/Projects/ResearchWorks/trunk_detection/apple_orchard/code/runs/detect/train2/weights/best.pt")
        self.scale_percent = 100
        self.queue=1

        self.bridge = CvBridge()
        self.scale_percent = 100 # percent of original size
        self.sub_image = rospy.Subscriber("/camera/color/image_raw", Image, self.callback_image, queue_size=1, buff_size=2**24)  #/zed2/zed_node/left/image_rect_color
        self.pub_img =  rospy.Publisher('trunk',Image,queue_size = self.queue)

    def callback_image(self,data):
        self.imagecv = self.bridge.imgmsg_to_cv2(data, desired_encoding='passthrough')
        self.imagecv = cv2.cvtColor(self.imagecv , cv2.COLOR_RGB2BGR)
        img = self.imagecv
        half = False
        img_sz = [640, 640]
        width = 640 #int(img.shape[0] * self.scale_percent / 100)  #384
        height = 360# int(img.shape[1] * self.scale_percent / 100) #640
        dim = (width, height)
  
        # resize image
        resized = cv2.resize(img, dim, interpolation = cv2.INTER_AREA)
        imr = resized

        results = self.model.predict(imr,device='0')
        classes = results[0].boxes.cls.cpu().numpy()
        names = self.model.names

        k = 0
        if len(results[0].boxes.xyxy) > 0:

            for i in range(len(results[0].boxes.xyxy)):

                annotator = Annotator(imr)
                b = results[0].boxes[i].xyxy.cpu().numpy().flatten() # get box coordinates in (top, left, bottom, right) format
                c = int(results[0].boxes[i].cls)
                cls = [c]
                annotator.box_label(b,f'{names[c]}', color=colors(c))
                final_img = annotator.result()
                
								##TO VISUALIZE
                # cv2.namedWindow("asd", cv2.WINDOW_NORMAL)  
                # cv2.imshow('asd',imr)
                # cv2.waitKey(0)
                # cv2.destroyAllWindows()
        else:
            final_img = imr

        image_message = self.bridge.cv2_to_imgmsg(final_img, "passthrough")
        self.pub_img.publish(image_message)

def main():
    '''Initializes and cleanup ros node'''
    rospy.init_node('trunk_detection_node')
    c = counter()
    # c.run()

    try:
        # c = counter()
        # c.run()
        rospy.spin()
    except KeyboardInterrupt:
        print ("Shutting down ROS Image feature detector module")

if __name__ == "__main__":
    main()

Details of code above:

Topic subscribed /camera/color/image_raw

Topic published /trunk

define a subscriber with message type Image and callback function self.callback_image with queue size of 1 and large buffer size to make sure the whole image can stay.

self.sub_image = rospy.Subscriber("/camera/color/image_raw", Image, self.callback_image, queue_size=1, buff_size=2**24)

The main processing is in callback function.

Run inference on the subscribed image results = self.model.predict(imr,device='0')