RTSP推拉流服务搭建

一、基础服务搭建(windows)

1.下载RTSP服务器

下载链接:https://github.com/aler9/rtsp-simple-server/releases

2.下载FFmpeg工具

下载链接:https://github.com/BtbN/FFmpeg-Builds/releases

3.启动服务器

进入RTSP服务器路径,控制台执行.\mediamtx.exe

4.启动推流服务

进入FFmpeg服务器路径,控制台执行ffmpeg -re -stream_loop -1 -i belt.mp4 -c copy -f rtsp rtsp://127.0.0.1:8554/video,循环播放视频文件并进行推流

5.播放流媒体

启动VLC播放器

二、opencv+python 实现RTSP推拉流

1.拉流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import cv2
import time
import queue
import threading


def capturevideo(capture, framequeue):
while True:
if framequeue.full():
time.sleep(0.1)
else:
ret, frame = capture.read()
if not ret:
return
image_bgr = frame
framequeue.put(image_bgr)


def execute_rtsp(rtsp_src):
capture = cv2.VideoCapture(rtsp_src)
framequeue = queue.Queue(10)

thread = threading.Thread(
target=capturevideo, args=(capture, framequeue))
thread.start()
time.sleep(1)
width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(capture.get(cv2.CAP_PROP_FPS))

while True:
if not framequeue.empty():
frame_bgr = framequeue.get()
cv2.imshow("image", frame_bgr) # 显示图像
cv2.waitKey(-1)


if __name__ == '__main__':
rtsp_url = 'rtsp://192.168.9.164:8554/video'
execute_rtsp(rtsp_url)

2.拉流+推流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import cv2
import time
import queue
import threading
import subprocess as sp


def capturevideo(capture, framequeue):
while True:
if framequeue.full():
time.sleep(0.1)
else:
ret, frame = capture.read()
if not ret:
return
image_bgr = frame
framequeue.put(image_bgr)


def execute_rtsp(rtsp_src, rtsp_dst):
capture = cv2.VideoCapture(rtsp_src)
framequeue = queue.Queue(10)

thread = threading.Thread(
target=capturevideo, args=(capture, framequeue))
thread.start()
time.sleep(1)
width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(capture.get(cv2.CAP_PROP_FPS))
command = ['ffmpeg',
'-y',
'-f', 'rawvideo',
'-vcodec', 'rawvideo',
'-pix_fmt', 'bgr24',
'-s', "{}x{}".format(width, height),
'-r', str(fps),
'-i', '-',
'-c:v', 'libx264',
'-pix_fmt', 'yuv420p',
'-preset', 'ultrafast',
'-f', 'rtsp',
rtsp_dst]
pipe = sp.Popen(command, stdin=sp.PIPE)
while True:
if not framequeue.empty():
frame_bgr = framequeue.get()
pipe.stdin.write(frame_bgr.tostring())


if __name__ == '__main__':
rtsp_url = 'rtsp://192.168.9.164:8554/video'
rtsp_stream = 'rtsp://192.168.9.164:8554/live'
execute_rtsp(rtsp_url, rtsp_stream)

可以看到,视频码流已推送到8554/live地址下

三、视频处理,推流

1.多线程处理视频帧

主函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import cv2
import time
import queue
import threading
import multiprocessing
import subprocess as sp

from ultralytics import YOLO
from visualize.detection import *


def capturevideo(capture, framequeue):
global frame_id
while True:
if framequeue.full():
time.sleep(0.1)
else:
ret, frame = capture.read()
if not ret:
continue
image_bgr = frame
frame_id += 1
framequeue.put((image_bgr, frame_id))


def execute_rtsp(rtsp_src, rtsp_dst):
capture = cv2.VideoCapture(rtsp_src)
framequeue = queue.Queue(10)
outputqueue = queue.Queue(10)

thread = threading.Thread(
target=capturevideo, args=(capture, framequeue))
thread.start()
time.sleep(1)
width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(capture.get(cv2.CAP_PROP_FPS))
command = ['ffmpeg',
'-y',
'-f', 'rawvideo',
'-vcodec', 'rawvideo',
'-pix_fmt', 'bgr24',
'-s', "{}x{}".format(width, height),
'-r', str(fps),
'-i', '-',
'-c:v', 'libx264',
'-pix_fmt', 'yuv420p',
'-preset', 'ultrafast',
'-f', 'rtsp',
rtsp_dst]
pipe = sp.Popen(command, stdin=sp.PIPE)
for _ in range(5):
t = threading.Thread(target=process, args=(framequeue, outputqueue, pipe))
t.start()


def process(framequeue, outputqueue, pipe):
global queue_id
model = YOLO('runs/detect/helmet_model/weights/best.pt')
while True:
if not framequeue.empty():
frame_bgr, frame_uid = framequeue.get()
results = model.predict(
frame_bgr,
stream=False,
save=False,
classes=[0, 2, 4, 7],
device=[0, 1],
verbose=False,
)
names = results[0].names
clses = [int(i) for i in results[0].boxes.cls.tolist()]
boxes = results[0].boxes.xyxy.tolist()
name_set = [names.get(i) for i in clses]

frame_bgr = visualize_det(frame_bgr, boxes, name_set)
while True:
time.sleep(0.0001)
if queue_id+1 == frame_uid:
outputqueue.put(frame_bgr)
break

frame = outputqueue.get()
pipe.stdin.write(frame.tostring())
queue_id += 1


if __name__ == '__main__':
frame_id = 0
queue_id = 0
rtsp_url = 'rtsp://192.168.9.164:8554/video'
rtsp_stream = 'rtsp://192.168.9.164:8554/live'
execute_rtsp(rtsp_url, rtsp_stream)

可视化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import os
import cv2
import numpy as np
from PIL import Image, ImageDraw, ImageFile, ImageFont

from settings import PR

font_file = os.path.join(PR, 'visualize/SourceHanSansCN-Medium.otf')
ImageFile.LOAD_TRUNCATED_IMAGES = True


def visualize_det(im, boxes, names):
if isinstance(im, str):
im = Image.open(im)
im = np.ascontiguousarray(np.copy(im))
im = cv2.cvtColor(im, cv2.COLOR_RGB2BGR)
else:
im = np.ascontiguousarray(np.copy(im))

im = Image.fromarray(im)
im = im.convert('RGBA')

for i, name in enumerate(names):
box = boxes[i]
box = [int(j) for j in box]
text = name

draw = ImageDraw.Draw(im)
draw.text(
(box[0], box[1]),
text,
font=ImageFont.truetype(font_file, size=20),
fill=(0, 0, 0, 1000))
draw.rectangle(
((box[0], box[1]), (box[2], box[3])),
fill=None,
outline=(139, 0, 139),
width=1)

im = im.convert('RGB')
im = np.ascontiguousarray(np.copy(im))
return im

2.多线程封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import cv2
import time
import queue
import threading
import subprocess as sp
import concurrent.futures

from ultralytics import YOLO
from visualize.detection import *
from settings import *

lock = threading.Lock()


class StreamingDetectionServer:
def __init__(self, stream_src, stream_dst):
self.pipe = None
self.model = None
self.capture = None

self.fps = 0
self.count = 0
self.width = 0
self.height = 0
self.frame_id = 0
self.solve_id = 1

self.stream_src = stream_src
self.stream_dst = stream_dst

self.framequeue = queue.PriorityQueue(10)
self.outputqueue = queue.PriorityQueue()

def run(self):
self.capture = cv2.VideoCapture(self.stream_src)
self.fps = int(self.capture.get(cv2.CAP_PROP_FPS))
self.width = int(self.capture.get(cv2.CAP_PROP_FRAME_WIDTH))
self.height = int(self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT))

command = ['ffmpeg',
# '-y',
'-re',
'-f', 'rawvideo',
'-vcodec', 'rawvideo',
'-pix_fmt', 'bgr24',
'-s', "{}x{}".format(self.width, self.height),
'-r', str(self.fps),
'-i', '-',
'-c:v', 'libx264',
'-pix_fmt', 'yuv420p',
'-preset', 'ultrafast',
'-f', 'rtsp',
self.stream_dst]
pipe = sp.Popen(command, stdin=sp.PIPE)

capture_thread = threading.Thread(target=self.capture_video)
capture_thread.start()

for _ in range(5):
process_thread = threading.Thread(target=self.process_video)
process_thread.start()

output_thread = threading.Thread(target=self.output_video, args=(pipe,))
output_thread.start()

def capture_video(self):
while True:
if self.framequeue.full():
time.sleep(0.01)
else:
ret, frame = self.capture.read()
if not ret:
continue
image_bgr = frame
self.frame_id += 1
self.framequeue.put((self.frame_id, image_bgr))

def output_video(self, pipe):
while True:
if not self.outputqueue.empty():
frame_id, frame_bgr = self.outputqueue.get()
pipe.stdin.write(frame_bgr.tobytes())
else:
time.sleep(0.01)

def process_video(self):
model = YOLO('runs/detect/helmet_model/weights/best.pt')
while True:
if not self.framequeue.empty():
with lock:
frame_id, frame_bgr = self.framequeue.get()
results = model.track(
frame_bgr,
stream=False,
save=False,
classes=[0, 2, 4, 7],
device="0",
verbose=False,
tracker="bytetrack.yaml"
)
names = results[0].names
clses = [int(i) for i in results[0].boxes.cls.tolist()]
boxes = results[0].boxes.xyxy.tolist()
name_set = [names.get(i) for i in clses]
frame_bgr = visualize_det_cv2(frame_bgr, boxes, name_set)
while True:
time.sleep(0.001)
if self.solve_id == frame_id:
self.outputqueue.put((frame_id, frame_bgr))
self.solve_id += 1
break


if __name__ == '__main__':
rtsp_url = input('码流链接:')
rtsp_stream = 'rtsp://192.168.9.164:8554/live2'
sds = StreamingDetectionServer(rtsp_url, rtsp_stream)
sds.run()

3.多进程处理视频帧(优化到极致,进一步提升算法处理速度,后续将单独开板块做出说明)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import cv2
import time
import queue
import torch
import threading
import multiprocessing
import subprocess as sp
import concurrent.futures

from ultralytics import YOLO
from visualize.detection import *
from settings import *
from queue import PriorityQueue
from multiprocessing.managers import BaseManager

lock = multiprocessing.Lock()


class Manager(BaseManager):
pass


Manager.register('get_priorityQueue', PriorityQueue)


class StreamingDetectionServer:
def __init__(self, stream_src, stream_dst):
self.pipe = None
self.model = None
self.capture = None

self.fps = 0
self.count = 0
self.width = 0
self.height = 0
self.frame_id = 0

self.stream_src = stream_src
self.stream_dst = stream_dst

def run(self):
q = queue.PriorityQueue() # 用于将多进程队列中的数据同步到线程主线程中的优先级队列,实现按帧出队列
m = Manager()
m.start()
# framequeue = multiprocessing.Queue(10)
framequeue = torch.multiprocessing.Queue(10)
outputqueue = multiprocessing.Queue()
# outputqueue = m.get_priorityQueue() # 继承多线程优先级队列的多进程队列,处理时间过长,放弃
self.capture = cv2.VideoCapture(self.stream_src)
self.fps = int(self.capture.get(cv2.CAP_PROP_FPS))
self.width = int(self.capture.get(cv2.CAP_PROP_FRAME_WIDTH))
self.height = int(self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT))

command = ['ffmpeg',
# '-y',
'-re',
'-f', 'rawvideo',
'-vcodec', 'rawvideo',
'-pix_fmt', 'bgr24',
'-s', "{}x{}".format(self.width, self.height),
'-r', str(self.fps),
'-i', '-',
'-c:v', 'libx264',
'-pix_fmt', 'yuv420p',
'-preset', 'ultrafast',
'-f', 'rtsp',
self.stream_dst]
pipe = sp.Popen(command, stdin=sp.PIPE)

solve_id = multiprocessing.Value('i', 1) # 使用进程全局变量记录处理视频帧的顺序

capture_thread = threading.Thread(target=self.capture_video, args=(framequeue,))
capture_thread.start()

for _ in range(5):
process_thread = multiprocessing.Process(target=self.process_video, args=(framequeue, outputqueue, solve_id))
process_thread.start()

queue_thread = threading.Thread(target=self.make_queue, args=(outputqueue, q))
queue_thread.start()

output_thread = threading.Thread(target=self.output_video, args=(pipe, q))
output_thread.start()

# 获取码流
def capture_video(self, framequeue):
while True:
if framequeue.full():
time.sleep(0.01)
else:
ret, frame = self.capture.read()
if not ret:
continue
image_bgr = frame
image_bgr = torch.from_numpy(image_bgr)
image_bgr.share_memory_()
self.frame_id += 1
framequeue.put((self.frame_id, image_bgr))

# 多进程队列数据同步至优先级队列
@staticmethod
def make_queue(outputqueue, q):
while True:
if not outputqueue.empty():
frame_id, frame_bgr = outputqueue.get()
q.put((frame_id, frame_bgr))
else:
time.sleep(0.001)

# 从优先级队列推流
@staticmethod
def output_video(pipe, q):
while True:
if q.qsize() >= 10:
frame_id, frame_bgr = q.get()
pipe.stdin.write(frame_bgr.tobytes())
else:
time.sleep(0.001)

# 多进程
@staticmethod
def process_video(framequeue, outputqueue, solve_id):
model = YOLO('runs/detect/helmet_model/weights/best.pt')
while True:
if not framequeue.empty():
with lock:
frame_id, frame_bgr = framequeue.get()
frame_bgr = frame_bgr.numpy()
results = model.track(
frame_bgr,
stream=False,
save=False,
classes=[0, 2, 4, 7],
device="0",
batch=16,
verbose=False,
tracker="bytetrack.yaml"
)
names = results[0].names
clses = [int(i) for i in results[0].boxes.cls.tolist()]
boxes = results[0].boxes.xyxy.tolist()
name_set = [names.get(i) for i in clses]
frame_bgr = visualize_det_cv2(frame_bgr, boxes, name_set)
# 通过共享实现id实现优先级入队
while True:
if solve_id.value == frame_id:
outputqueue.put((frame_id, frame_bgr))
solve_id.value += 1
break


if __name__ == '__main__':
rtsp_url = input('码流链接:')
rtsp_stream = 'rtsp://192.168.9.164:8554/live'
sds = StreamingDetectionServer(rtsp_url, rtsp_stream)
sds.run()

Powered by Hexo and Hexo-theme-hiker

Copyright © 2017 - 2024 青域 All Rights Reserved.

UV : | PV :