@pmackinnon
I have attempted to re-run the pass_thru.py example after updating the shell scripts I used to use the 22.11 runtime.
For context, we are students attempting to learn how to use Morpheus for a senior design project. Since we are on the school’s HPC, we are not given docker level access and instead are using singularity. We have been translating the Github docker commands into singularity, which I figured was worth noting. Admittedly I am not too well versed in the difference between the two.
A quick overview of my workflow:
I created a shell script passthru_runner.sh
that exports the MORPHEUS_ROOT and runs the run_passthru.py file noted below:
#!/bin/bash
#SBATCH --gpus=1
export MORPHEUS_ROOT="/data/sdp/cybersecurity_ai/sif/morpheus-22.11-runtime.sif"
. /opt/conda/etc/profile.d/conda.sh
python /data/sdp/cybersecurity_ai/files/pass_thru/run_passthru.py
Below is the run_passthru.py
file.
import logging
import os
from morpheus.config import Config
from morpheus.pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.utils.logger import configure_logging
from pass_thru import PassThruStage
def run_pipeline():
#print("DEBUG: " + MonitorStage.__path__)
# Enable the Morpheus logger
configure_logging(log_level=logging.DEBUG)
root_dir = os.environ['MORPHEUS_ROOT']
# input_file = os.path.join(root_dir, 'examples/data/email_with_addresses.jsonlines')
input_file = "/data/sdp/cybersecurity_ai/files/pass_thru/email_with_addresses.jsonlines"
config = Config()
# Create a linear pipeline object
pipeline = LinearPipeline(config)
# Set source stage
pipeline.set_source(FileSourceStage(config, filename=input_file, iterative=False))
# Add our own stage
pipeline.add_stage(PassThruStage(config))
# Add monitor to record the performance of our new stage
pipeline.add_stage(MonitorStage(config))
# Run the pipeline (This is where it fails based on the traceback)
pipeline.run()
if __name__ == "__main__":
run_pipeline()
Lastly, my pass_thru.py
file:
import typing
import srf
from morpheus.cli.register_stage import register_stage
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
@register_stage("pass-thru")
class PassThruStage(SinglePortStage):
"""
A Simple Pass Through Stage
"""
@property
def name(self) -> str:
return "pass-thru"
def accepted_types(self) -> typing.Tuple:
return (typing.Any, )
def supports_cpp_node(self) -> bool:
return False
def on_data(self, message: typing.Any):
# Return the message for the next stage
return message
def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> StreamPair:
node = builder.make_node(self.unique_name, self.on_data)
builder.make_edge(input_stream[0], node)
return node, input_stream[1]
I make sure to have a GPU allocated for me to run on, and then conda activate morpheus
to ensure we are using the container. From there, I run the passthru_runner.sh
script. When I run this script, I get this output:
====Registering Pipeline====
Error occurred during Pipeline.build(). Exiting.
Traceback (most recent call last):
File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/pipeline.py", line 277, in build_and_start
self.build()
File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/pipeline.py", line 175, in build
self._srf_executor = srf.Executor(self._exec_options)
RuntimeError: intersection between user_cpuset and topo_cpuset is null
Traceback (most recent call last):
File "/data/sdp/cybersecurity_ai/files/pass_thru/run_passthru.py", line 40, in <module>
Exception occurred in pipeline. Rethrowing
Traceback (most recent call last):
File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/pipeline.py", line 251, in join
await self._srf_executor.join_async()
AttributeError: 'NoneType' object has no attribute 'join_async'
====Pipeline Complete====
run_pipeline()
File "/data/sdp/cybersecurity_ai/files/pass_thru/run_passthru.py", line 37, in run_pipeline
pipeline.run()
File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/pipeline.py", line 517, in run
asyncio.run(self._do_run())
File "/opt/conda/envs/morpheus/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/opt/conda/envs/morpheus/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/pipeline.py", line 495, in _do_run
await self.join()
File "/opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/pipeline.py", line 251, in join
await self._srf_executor.join_async()
AttributeError: 'NoneType' object has no attribute 'join_async'
I am attaching an image of the same output above, as in my terminal it is color coded and that does not translate to text here:
Any help is appreciated, as getting this basic python stage functional would be pivotal in allowing us to attempt to create our own custom modules. If you need any more information, let me know and I’ll get it to you ASAP. Thank you for your help.