Best practices to integrate Elasticsearch to Morpheus DFP as source stage?

Language: Python
What is the best way to setting up Elasticsearch as source stage in DFP use case?

  1. Using elasticseash package export data to file, then feed it to morpheus_dfp.stages.multi_file_source.
  2. Using elasticseash package export data to pandas.DataFrame, then process it.
  3. Other

Is any sample code can reference?

Both are viable options but #1 leverages more of the existing example code. I have personally seen this done in a production environment where daily logs are pulled from elasticsearch and saved to files similar to our Azure and Duo example data. You could then feed them into our example DFP pipeline using the MultiFileSource stage as you mentioned. Some things to keep in mind:

  • Make sure you have source/preprocess schemas to match your data. Here’s an example.
  • File names have timestamps used to batch the source data by time period (default is day). For example: AZUREAD_2022-08-01T00_03_56.207Z.json

I am trying to use option 2 from http_client_source_stage.py but using elasticsearch package.
Is any example code fo http_client_source_stage.py?

We don’t have an example of using HttpClientSourceStage in a DFP pipeline. We do have this test that demonstrates its use in a simple pipeline: