RuntimeError: Invalid rolling window for user testuser1

Hi

Does anyone know how to fix this error when setting up digital fingerprinting?

  dtype='object')E20250702 22:36:12.590867 140170564585024 context.cpp:124] /linear_segment_0/dfp-rolling-window-2; rank: 0; size: 1; tid: 140170564585024: set_exception issued; issuing kill to current runnable. Exception msg: RuntimeError: Invalid rolling window for user testuser1

My rolling window settings

Next, have a stage that will create rolling windows

pipe.add_stage(
DFPRollingWindowStage(
config,
min_history=300,
min_increment=300,
max_history=“10d”,
cache_dir=“/workspace/cache”))

I’ve set the fallback username in the config.

config.ae.fallback_username = “generic_username”

I thought users which don’t meet the min history threshold are excluded?

Thanks!

This error usually means the rolling window size you’re using is bigger than your data or not properly set. For example, if you’re trying to apply a rolling average or sum and your window size is 5, but your data has only 3 points, it will throw this error.

Make sure the window value is:

A positive integer

Smaller than or equal to the number of data points

Not None or invalid type

Double-check the code where you’re setting the rolling window and try adjusting the size to fit your data.

Let me know if you want help checking the exact code!

Hi,
Could you please provide the following information to help us better investigate the issue you reported :

  • Are you running the Digital Fingerprinting example from the Morpheus source code or are you using the Digital Fingerprinting container?
  • What version of Morpheus are you using?
  • The size of your dataset

Thank you

I’m using the morpheus container version 25.02 with a 30GB GZip file of windows security event logs.

I’m running this on a brev VM with

NVIDIA L40S (48GiB)
1 GPUs x 8 CPUs | 147GiB
1TiB128GiB

I was trying to create a new custom SingleOutputSource input stage which reads jsonlines data from a the 30GB gzip file and output’s logs in batches of cudf’s with 10,000 rows. However does Digital Fingerprinting need to read data from smaller 1-4GB local files/AWS S3 and custom sources/Kafka stage aren’t supported as an input?

This is because the logs must be processed in DFPFileToDataFrameStage to group logs by user per day to avoid overlapping timestamps?

DFPFileBatcherStage OR MultiFileSource → DFPFileToDataFrameStage → DFPSplitUsersStage → DFPRollingWindowStage → DFPPreprocessingStage → Training

I’m doing

CustomSourceStage → DFPSplitUsersStage → DFPRollingWindowStage → DFPPreprocessingStage → Training

Or are custom input stages supported, and I need to add a custom stage after DFPSplitUsersStage and before DFPRollingWindowStage. This stage would sort timestamps to avoid overlapping timestamps errors, and maybe drop users with not enough data to create a rolling window?

Thanks for your help!

I’ve added a custom stage to sort the data frame by the timestamp after DFPSplitUsersStage and before DFPRollingWindowStage.

I’ve stopped this these errors

  dtype='object')E20250702 22:36:12.590867 140170564585024 context.cpp:124] /linear_segment_0/dfp-rolling-window-2; rank: 0; size: 1; tid: 140170564585024: set_exception issued; issuing kill to current runnable. Exception msg: RuntimeError: Invalid rolling window for user testuser1

and

RuntimeError: Overlapping rolling history detected. Rolling history can only be used with non-overlapping batches

Here’s the custom sort timestamp stage

import cudf
import mrc
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.messages import ControlMessage, MessageMeta
import typing

class DFPSortTimestampsStage(SinglePortStage):
    def __init__(self, config, timestamp_column: str = "@timestamp"):
        super().__init__(config)
        self.timestamp_column = timestamp_column


    @property
    def name(self) -> str:
        return "dfp-sort-timestamps"

    def supports_cpp_node(self):
        return False

    def accepted_types(self) -> typing.Tuple:
        return (ControlMessage, )

    def compute_schema(self, schema):
        schema.output_schema.set_type(ControlMessage)

    def _build_single(self, builder, input_node):

        def node_fn(msg: ControlMessage):
            meta = msg.payload()
            df = meta.df
            if self.timestamp_column not in df.columns:
                self.logger.warning(f"Timestamp column '{self.timestamp_column}' not found. Skipping sort.")
                return msg

            sorted_df = df.sort_values(by=self.timestamp_column).reset_index(drop=True)

            # print(sorted_df.head(5))

            meta = MessageMeta(sorted_df)
            msg = ControlMessage()
            msg.payload(meta)
            return msg

        node = builder.make_node(self.unique_name, node_fn)
        builder.make_edge(input_node, node)  # 🔧 This connects the input to the operator
        return node

The only warnings I’m seeing look like they are related to Morpheus code.

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

filtered_df["_batch_id"] = self.batch_count

/workspace/morpheus-25.02/Morpheus/python/morpheus_dfp/morpheus_dfp/utils/cached_user_window.py:64: SettingWithCopyWarning:

A value is trying to be set on a copy of a slice from a DataFrame.

Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

filtered_df["_row_hash"] = pd.util.hash_pandas_object(filtered_df, index=False)

/workspace/morpheus-25.02/Morpheus/python/morpheus_dfp/morpheus_dfp/utils/cached_user_window.py:67: SettingWithCopyWarning:

A value is trying to be set on a copy of a slice from a DataFrame.

Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

filtered_df["_batch_id"] = self.batch_count

/workspace/morpheus-25.02/Morpheus/python/morpheus_dfp/morpheus_dfp/utils/cached_user_window.py:64: SettingWithCopyWarning:

A value is trying to be set on a copy of a slice from a DataFrame.

Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

filtered_df["_row_hash"] = pd.util.hash_pandas_object(filtered_df, index=False)

/workspace/morpheus-25.02/Morpheus/python/morpheus_dfp/morpheus_dfp/utils/cached_user_window.py:67: SettingWithCopyWarning:

A value is trying to be set on a copy of a slice from a DataFrame.

Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

Am I on the right track or down a rabbit hole?

Thanks!

It crashed, just took a bit longer.

Isn’t the rolling window stage meant to wait until the user has enough data?

/workspace/morpheus-25.02/Morpheus/python/morpheus_dfp/morpheus_dfp/utils/cached_user_window.py:64: SettingWithCopyWarning:

A value is trying to be set on a copy of a slice from a DataFrame.

Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

filtered_df["_row_hash"] = pd.util.hash_pandas_object(filtered_df, index=False)

/workspace/morpheus-25.02/Morpheus/python/morpheus_dfp/morpheus_dfp/utils/cached_user_window.py:67: SettingWithCopyWarning:

A value is trying to be set on a copy of a slice from a DataFrame.

Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

filtered_df["_batch_id"] = self.batch_count

E20250704 14:55:24.008636 140390564210240 context.cpp:124] /linear_segment_0/dfp-rolling-window-3; rank: 0; size: 1; tid: 140390564210240: set_exception issued; issuing kill to current runnable. Exception msg: RuntimeError: Invalid rolling window for user testuser1

At:

/workspace/morpheus-25.02/Morpheus/python/morpheus_dfp/morpheus_dfp/stages/dfp_rolling_window_stage.py(152): _build_window

/workspace/morpheus-25.02/Morpheus/python/morpheus_dfp/morpheus_dfp/stages/dfp_rolling_window_stage.py(182): on_data

E20250704 14:55:24.008949 140397254391616 runner.cpp:189] Runner::await_join - an exception was caught while awaiting on one or more contexts/instances - rethrowing

I increased the batch size from 20MB to 4GB and it’s stopped the “Rolling history can only be used with non-overlapping batches” errors so far. That increased the logs in each data frame from 15,000 to 500,000.