How to record video with gst-launch-1.0 at a set time?

Hello, i have a project on jetson nano and i have a problem with video recording (i use mipi camera imx219). The video recording should last strictly 10 minutes. Is it possible tom do it with using gst-launch-1.0 ? For example in ffmpeg i can control time of record , but in gst or opencv recordings can last 10 minutes and two seconds, or 9 minutes and 50 seconds, depending on fps.

Hi,
You can set num-buffers to ** plugin. For example, if the source is 30fps, there are 18000 frames for 10 minutes:

$ gst-launch-1.0 -e nvarguscamerasrc num-buffers=18000 ! nvv4l2h264enc ! h264parse ! qtmux ! filesink location=test.mp4
1 Like

okay, i will try and get feedback . thx

And i have another problem Error generated. /dvs/git/dirty/git-master_linux/multimedia/nvgstreamer/gst-nvarguscamera/gstnvarguscamerasrc.cpp, execute:736 Failed to create CaptureSession. So we can’t video recording and analyze video at same time? or need modify list of flags gst?

import numpy as np
import os
import datetime
import cv2
import socket
import time
import signal 
import subprocess



class VideoRecording:
    def __new__(cls,*args, **kwargs):
        """
        singleton
        """
        if not hasattr(cls, 'instance'):
            cls.instance = super(VideoRecording,cls).__new__(cls)
        return cls.instance
    
    def __init__(self,outdir:str=os.getcwd()+'/output_videos/',person_timeout:int=30, max_file_length:int = 10, fps:int = 2):
        if not os.path.exists(outdir):
            os.makedirs(outdir)
            os.chmod(outdir, 0o777)

        self.gst_process = None

        self.outdir = outdir
        self.timeout = person_timeout
        self.timer = 0
        self.out_stream = None
        self.stream_time_created = 0
        self.max_file_length = max_file_length

        num_buffer = fps * max_file_length + fps
        self.cmd =  [
                    "gst-launch-1.0", "-e",
                    "nvarguscamerasrc", f"num-buffers={str(num_buffer)}",
                    "!", f"video/x-raw(memory:NVMM),width=1920,height=1080,format=NV12,framerate={str(fps)}/1",
                    "!", "nvvidconv",
                    "!", "video/x-raw,format=I420",
                    "!", "videoconvert",
                    "!", "x264enc",
                    "!", "matroskamux",
                    "!", "filesink",
                    "sync=false",
                    "async=true"
                    ]

        signal.signal(signal.SIGINT, self.__signal_handler)



    def __signal_handler(self, *args):
        """
        Обработчик сигнала для корректного завершения записи.
        """
        if self.out_stream is not None:
            self.out_stream.terminate()
        self.release()
        exit()


    def __create_writer(self):
        cur_time = datetime.datetime.now().strftime("%Y-%m-%d-%Hh-%Mm-%Ss")
        out_path = os.path.join(self.outdir,
                                socket.gethostname() + "-" + cur_time + ".mkv")
        self.stream_time_created = time.time()

        self.cmd.append(f"location={out_path} ")

        try:
            self.gst_process = subprocess.Popen(self.cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        except Exception as e:
            print(f"gstreamer error: {e}")
            self.gst_process = None
    
        self.cmd.pop(-1)


        return self.gst_process


    def record(self, person_detected:bool) -> None:

        if person_detected:
            self.timer = time.time()

        if time.time() - self.timer < self.timeout:
            
            if self.out_stream is None:
                self.out_stream = self.__create_writer()
                
            if time.time() - self.stream_time_created > self.max_file_length:
                if self.out_stream.poll() is not None: 
                    self.release()

        else:
            self.release()

    def release(self) -> None:
        """
        free process
        """
        if self.out_stream is not None:
            print("release process...")
            try:
                self.gst_process.terminate()
            except Exception as e:
                print(f"error while terminate process {e}")
            self.out_stream = None

# _________________EXAMPLE_________________

if __name__ == "__main__":
    cap = cv2.VideoCapture("nvarguscamerasrc ! video/x-raw(memory:NVMM), width=(int)1920, height=(int)1080, format=(string)NV12, framerate=(fraction)21/1 ! nvvidconv ! video/x-raw, format=(string)BGRx ! videoconvert ! video/x-raw, format=(string)BGR ! appsink")
    recorder = VideoRecording()
    while True:
        ret,img = cap.read()
        # print(ret)
        recorder.record(True)
1 Like

Hi,
Please try

$ gst-launch-1.0 nvarguscamerasrc ! fakesink

If it hits same failure, there may be certain issue in sensor driver. Would suggest check device tree and sensor driver.