DLI Building RAG Agents with LLMs - Unable to Configure Fast API

Hopefully someone out there knows how to get the configured API working and running in Gradio. No matter what I do I cannot get it to load my code base into the server and cannot connect to it using Gradio. This is how I am trying to connect and load the server currently:

%%writefile clean_server.py
import threading
import asyncio
import uvicorn
import nest_asyncio
from fastapi import FastAPI
from langchain_nvidia_ai_endpoints import ChatNVIDIA, NVIDIAEmbeddings
from langserve import add_routes
from langchain_community.vectorstores import FAISS
from langchain.chains import RetrievalQA
from langchain_core.runnables import RunnableLambda
from operator import itemgetter
from pydantic import BaseModel, model_validator
import pprint

# Allow asyncio to run multiple times in Jupyter notebook
nest_asyncio.apply()

# Load models
instruct_llm = ChatNVIDIA(model="mistralai/mixtral-8x22b-instruct-v0.1")
embedder = NVIDIAEmbeddings(model="nvidia/nv-embed-v1", truncate="END")

# Load FAISS vectorstore
vectorstore = FAISS.load_local(
    "docstore_index",
    embeddings=embedder,
    index_name="index",
    allow_dangerous_deserialization=True
)

retriever = vectorstore.as_retriever()

# Test vectorstore
try:
    test_docs = retriever.get_relevant_documents("test")
    print(f"✅ Vectorstore loaded with {len(test_docs)} test docs")
except Exception as e:
    print("❌ Vectorstore load failed:", str(e))

# Create FastAPI app
app = FastAPI(
    title="LangChain Server",
    version="1.0",
    description="Final LangServe RAG server for evaluation"
)

from fastapi.middleware.cors import CORSMiddleware

# Add CORS middleware to allow communication with different origins (Gradio frontend)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Allows all domains for testing; modify this for production
    allow_credentials=True,
    allow_methods=["*"],  # Allow all HTTP methods (POST, GET, etc.)
    allow_headers=["*"],  # Allow all headers
)


# Input coercion helper
def clean_input(raw):
    if isinstance(raw, str):
        return raw
    elif isinstance(raw, dict):
        return raw.get("input") or raw.get("question") or str(raw)
    return str(raw)

# Optional: Model for /retriever route
class RetrieverInput(BaseModel):
    input: str

    @model_validator(mode='before')
    def coerce_input(cls, values):
        if isinstance(values, str):
            return {"input": values}
        if isinstance(values, dict) and "input" in values:
            return values
        raise ValueError("Input must be a string or a dict with an 'input' field.")

# Wrap input for retriever route
def get_query_input(x):
    try:
        return RetrieverInput.parse_obj(x).input
    except Exception:
        return clean_input(x)

retriever_chain = RunnableLambda(lambda x: retriever.invoke(get_query_input(x)))

# Define the retriever route
@app.post("/retriever")
async def retriever_route(payload: dict):
    input_query = clean_input(payload)
    result = retriever.invoke(input_query)
    return {"result": result}

# RAG Chain
rag_chain = RetrievalQA.from_chain_type(
    llm=instruct_llm,
    retriever=retriever,
    return_source_documents=True
)

# Wrap RAG with clean input and logging
def debug_rag_input(x):
    print("🔍 Incoming input to /generator route:")
    print("🔎 Type:", type(x))
    pprint.pprint(x)
    question = clean_input(x)
    print("🧪 Cleaned question:", question)
    try:
        result = rag_chain.invoke(question)
        print("✅ RAG chain completed.")
        return result
    except Exception as e:
        print("❌ RAG chain error:", str(e))
        return {"error": str(e)}

generator_chain = RunnableLambda(debug_rag_input)

# Define the generator route
@app.post("/generator")
async def generator_route(payload: dict):
    question = clean_input(payload)
    result = rag_chain.invoke(question)
    return {"result": result}

# Health check route
@app.get("/health")
def health_check():
    return {"status": "ok"}

# Echo route for debugging payloads
add_routes(app, RunnableLambda(lambda x: {"echo": x}), path="/echo")

# Start the server in the background thread
def start_server():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)  # Ensure correct event loop is used
    print("🚀 Starting FastAPI server on port 8090...")  # Added log for debugging
    uvicorn.run(app, host="0.0.0.0", port=8090, log_level="info", loop="asyncio")


# Start the server in a separate thread to avoid blocking Jupyter
server_thread = threading.Thread(target=start_server, daemon=True)
server_thread.start()

When I try to simply check the server it will not load.

!curl http://localhost:8090/health

I have tried and killed the process on the port as well, confirmed it was killed and tried to reload. Nothing I do gets the server to load. When I run the code locally in the notebook I do not have this issue with being able to connect.

Any help would be deeply appreciated.

Hey @klingl

That’s an awesome attempt actually! The course doesn’t expect you to do all of this: rather, it expects you to use the already-existing server running in :8090, whereas you are just supposed to launch your endpoints from notebook 9 using LangServe (i.e. this syntax: langserve/examples/local_llm/server.py at main · langchain-ai/langserve). The frontend which is already running in 0.0.0.0:8090 (where 0.0.0.0 is just going to be relative to host url here) is just expecting the routes defined on 0.0.0.0:9012. If you wanted to keep using your solution format as an exercise, you’d just need to launch to port 9012 instead of port 8090 and then see if it works (It shouldn’t be too far off, but I have a feeling your endpoint won’t support streaming very well…)

In other words, the already-running front-end is making calls like this, which need fulfillment:

from langserve import RemoteRunnable
from langchain_core.output_parsers import StrOutputParser

llm = RemoteRunnable("http://0.0.0.0:9012/basic_chat/") | StrOutputParser()
for token in llm.stream("Hello World! How is it going?"):
    print(token, end='')

You can see the running server’s implementation in the frontend directory, but shouldn’t be trying to override it.

Hello @vkudlay,

I have tried also running file 09, only code cells 1 and 3 in their original form and also on a fresh instance with no existing ports running. Even this does not work to setup and connect the frontend. I have updated my codebase to use 9012 as you suggested and still have issues. I cannot understand where the issue is coming from.

I appreciate your efforts to help. I just need the server configged.

I just tested it out and it looks like it all works. When you get the chance, can you try these steps from a clean start:

  1. Run cells 1 and 2 in Notebook 9. Cell 1 defines the endpoint server which the frontend is relying on. /basic is already implemented and should enable the Basic route in the Frontend. Cell 2 (python !python server_app.py) kickstarts the server in the background of the notebook.
  2. Run the second-to-last cells in Notebook 3. This tests out the endpoint running from Notebook 9. If you get a stream there, you should get a response in the frontend.

This seemed to work for me, at least, and the solution just involves tweaking the server in NB9. Please let me know if it doesn’t work for you.


Hi @vkudlay ,

I have rewritten the code to be simpler as suggested. Gradio is now running but is throwing an error 422 which I am unable to fix. I do not understand what is going on here. Any help would be appreciated.

%%writefile server_app.py
# https://python.langchain.com/docs/langserve#server
from fastapi import FastAPI
from langchain_nvidia_ai_endpoints import ChatNVIDIA, NVIDIAEmbeddings
from langserve import add_routes

## May be useful later
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate
from langchain_core.prompt_values import ChatPromptValue
from langchain_core.runnables import RunnableLambda, RunnableBranch, RunnablePassthrough
from langchain_core.runnables.passthrough import RunnableAssign
from langchain_community.document_transformers import LongContextReorder
from functools import partial
from operator import itemgetter

from langchain_community.vectorstores import FAISS

## TODO: Make sure to pick your LLM and do your prompt engineering as necessary for the final assessment
embedder = NVIDIAEmbeddings(model="nvidia/nv-embed-v1", truncate="END")
instruct_llm = ChatNVIDIA(model="meta/llama3-8b-instruct")

    
# Load FAISS vectorstore
vectorstore = FAISS.load_local(
    "docstore_index",
    embeddings=embedder,
    index_name="index",
    allow_dangerous_deserialization=True
)
docs = list(vectorstore.docstore._dict.values())


def docs2str(docs, title="Document"):
    """Useful utility for making chunks into context string. Optional, but useful"""
    out_str = ""
    for doc in docs:
        doc_name = getattr(doc, 'metadata', {}).get('Title', title)
        if doc_name: out_str += f"[Quote from {doc_name}] "
        out_str += getattr(doc, 'page_content', str(doc)) + "\n"
    return out_str

chat_prompt = ChatPromptTemplate.from_template(
    "You are a document chatbot. Help the user as they ask questions about documents."
    " User messaged just asked you a question: {input}\n\n"
    " The following information may be useful for your response: "
    " Document Retrieval:\n{context}\n\n"
    " (Answer only from retrieval. Only cite sources that are used. Make your response conversational)"
    "\n\nUser Question: {input}"
)

def output_puller(inputs):
    """"Output generator. Useful if your chain returns a dictionary with key 'output'"""
    if isinstance(inputs, dict):
        inputs = [inputs]
    for token in inputs:
        if token.get('output'):
            yield token.get('output')



long_reorder = RunnableLambda(LongContextReorder().transform_documents)  ## GIVEN
context_getter = itemgetter('input') | vectorstore.as_retriever() | long_reorder | docs2str
retrieval_chain = {'input' : (lambda x: x)} | RunnableAssign({'context' : context_getter})

## Chain Specs: retrieval_chain -> generator_chain
##   -> {"output" : <str>, ...} -> output_puller
# generator_chain = RunnableLambda(lambda x: x)  ## TODO
generator_chain = chat_prompt | instruct_llm  | StrOutputParser()
#generator_chain = {"output" : generator_chain } | RunnableLambda(output_puller)  ## GIVEN


## END TODO
#####################################################################

rag_chain = retrieval_chain | generator_chain


app = FastAPI(
  title="LangChain Server",
  version="1.0",
  description="A simple api server using Langchain's Runnable interfaces",
)

## PRE-ASSESSMENT: Run as-is and see the basic chain in action

add_routes(
    app,
    instruct_llm,
    path="/basic_chat",
)

## ASSESSMENT TODO: Implement these components as appropriate

add_routes(
    app,
    generator_chain,
    path="/generator",
)

add_routes(
    app,
    retrieval_chain,
    path="/retriever",
)

## Might be encountered if this were for a standalone python file...
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=9012)