So after hours of researching and understanding the seeking concept in gstreamer to an extent, I have finally implemented a a python script that play a seamless loop , its a hack solution but it works and thought about sharing it:
1- Concatenated all mkv videos that I want to play into one mkv file using the following src: join - How to concatenate all "mkv" files using ffmpeg? - Stack Overflow
2- In my case I can easily know the duration of each video seperately so I mimiced that by having a list for all video durations
3- Kept track of my current position in the pipeline using Gst.Pad.query_position()
4- Upon program termination (in this case a KeyboardInterrupt
Exception, I save the current position of pipeline into a json file which I can later read to play the video from the same position where it stopped (rounded down to the start of the clip)
5- Used SEGMENT_DONE
and seeking to repeat a seamless loop. (This answer helped a lot: python - Seamless video loop in gstreamer - Stack Overflow)
Finally, here is my messy playground code to get the functionality working, hope it helps someone.
import os, time, json, sys
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
VIDEO_PATH = "./video.mkv"
LOG_TIMESTAMP_PATH = "./latest_timestamp.json"
END_TIME = [30.0, 30.0, 30.0, 15.0, 15.0, 15.0, 15.0, 15.0, 30.0, 30.0, 15.0, 15.0, 15.0]
assert len(END_TIME) == 13
if not Gst.init_check()[0]:
print("gstreamer initialization failed")
source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", VIDEO_PATH)
demux0 = Gst.ElementFactory.make("matroskademux", "demux0")
assert demux0 is not None
##### AUDIO ######
audio_decoder0 = Gst.ElementFactory.make("avdec_aac", "audio_decoder0")
assert audio_decoder0 is not None
audio_converter0 = Gst.ElementFactory.make("audioconvert", "audio_converter0")
assert audio_converter0 is not None
queue_a = Gst.ElementFactory.make("queue", "queue_a")
assert queue_a is not None
audio_sink = Gst.ElementFactory.make("alsasink", "audio_sink")
assert audio_sink is not None
audio_sink.set_property("device", "hw:0,3")
##################
##### VIDEO ######
video_parser0 = Gst.ElementFactory.make("h264parse", "video_parser0")
assert video_parser0 is not None
video_decoder0 = Gst.ElementFactory.make("nvv4l2decoder", "video_decoder0")
assert video_decoder0 is not None
video_nvidconv0 = Gst.ElementFactory.make("nvvidconv", "video_nvidconv0")
assert video_nvidconv0 is not None
video_nvidconv0.set_property("flip-method", 0)
queue_v = Gst.ElementFactory.make("queue", "queue_v")
assert queue_v is not None
video_sink = Gst.ElementFactory.make("nvoverlaysink", "video_sink")
assert video_sink is not None
##################
def demux0_pad_added(demux, pad):
if pad.name == "video_0":
pad.link(video_parser0.get_static_pad("sink"))
elif pad.name == "audio_0":
pad.link(audio_decoder0.get_static_pad("sink"))
demux0.connect("pad-added", demux0_pad_added)
# GST COMMAND: 'gst-launch-1.0 filesrc location="./docker/premium_ads/com_1.mkv" ! matroskademux name=demux demux.audio_0 ! queue ! avdec_aac ! audioconvert ! alsasink device="hw:0,3" demux.video_0 ! queue ! h264parse ! nvv4l2decoder ! nvvidconv flip-method=0 ! nvoverlaysink'
pipeline = Gst.Pipeline()
assert pipeline is not None
pipeline.add(source0)
pipeline.add(demux0)
pipeline.add(audio_decoder0)
pipeline.add(audio_converter0)
pipeline.add(queue_a)
pipeline.add(audio_sink)
pipeline.add(video_parser0)
pipeline.add(video_decoder0)
pipeline.add(video_nvidconv0)
pipeline.add(queue_v)
pipeline.add(video_sink)
source0.link(demux0)
"""demux0.video_0 and demux0.audio_0 dynamic linking"""
#audio links
audio_decoder0.link(audio_converter0)
audio_converter0.link(queue_a)
queue_a.link(audio_sink)
#video links
video_parser0.link(video_decoder0)
video_decoder0.link(video_nvidconv0)
video_nvidconv0.link(queue_v)
queue_v.link(video_sink)
def get_updated_timestamp():
with open(LOG_TIMESTAMP_PATH, "r") as f:
data = json.load(f)
return data["latest_timestamp"], data["latest_idx"]
def set_updated_timestamp(new_timestamp: float, timestamp_idx: int) -> None:
with open(LOG_TIMESTAMP_PATH, "w") as f:
json.dump({"latest_timestamp":new_timestamp, "latest_idx":timestamp_idx}, f)
START_TIME = 0
timestamp_idx = 0
def main():
global timestamp_idx
global START_TIME
message_bus = pipeline.get_bus()
pipeline.set_state(Gst.State.PLAYING)
pipeline.get_state(Gst.CLOCK_TIME_NONE)
latest_timestamp, latest_idx = get_updated_timestamp()
timestamp_idx = latest_idx
START_TIME = latest_timestamp
pipeline.seek(1.0,
Gst.Format.TIME,
Gst.SeekFlags.SEGMENT,
Gst.SeekType.SET,
latest_timestamp * Gst.SECOND,
Gst.SeekType.NONE,
0)
while True:
isSuccess, ns_dur = video_parser0.get_static_pad("sink").query_position(Gst.Format.TIME)
if isSuccess:
dur = ns_dur*1e-9
if dur >= START_TIME+END_TIME[timestamp_idx]:
START_TIME += END_TIME[timestamp_idx]
timestamp_idx+=1
print(dur, START_TIME, timestamp_idx, sep="::")
else:
START_TIME = 0
timestamp_idx = 0
if message_bus.have_pending():
message = message_bus.pop()
if message.type == Gst.MessageType.SEGMENT_DONE:
pipeline.set_state(Gst.State.PAUSED)
START_TIME = 0
timestamp_idx = 0
pipeline.seek(1.0,
Gst.Format.TIME,
Gst.SeekFlags.FLUSH | Gst.SeekFlags.SEGMENT,
Gst.SeekType.SET,
START_TIME * Gst.SECOND,
Gst.SeekType.NONE,
0)
pipeline.set_state(Gst.State.PLAYING)
elif message.type == Gst.MessageType.ERROR:
print(f"[ERROR] Bus error: {message}")
break
time.sleep(0.01)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
pipeline.set_state(Gst.State.PAUSED)
pipeline.set_state(Gst.State.READY)
pipeline.set_state(Gst.State.NULL)
set_updated_timestamp(START_TIME, timestamp_idx)