Close Menu
  • Home
  • AI Models
    • DeepSeek
    • xAI
    • OpenAI
    • Meta AI Llama
    • Google DeepMind
    • Amazon AWS AI
    • Microsoft AI
    • Anthropic (Claude)
    • NVIDIA AI
    • IBM WatsonX Granite 3.1
    • Adobe Sensi
    • Hugging Face
    • Alibaba Cloud (Qwen)
    • Baidu (ERNIE)
    • C3 AI
    • DataRobot
    • Mistral AI
    • Moonshot AI (Kimi)
    • Google Gemma
    • xAI
    • Stability AI
    • H20.ai
  • AI Research
    • Allen Institue for AI
    • arXiv AI
    • Berkeley AI Research
    • CMU AI
    • Google Research
    • Microsoft Research
    • Meta AI Research
    • OpenAI Research
    • Stanford HAI
    • MIT CSAIL
    • Harvard AI
  • AI Funding & Startups
    • AI Funding Database
    • CBInsights AI
    • Crunchbase AI
    • Data Robot Blog
    • TechCrunch AI
    • VentureBeat AI
    • The Information AI
    • Sifted AI
    • WIRED AI
    • Fortune AI
    • PitchBook
    • TechRepublic
    • SiliconANGLE – Big Data
    • MIT News
    • Data Robot Blog
  • Expert Insights & Videos
    • Google DeepMind
    • Lex Fridman
    • Matt Wolfe AI
    • Yannic Kilcher
    • Two Minute Papers
    • AI Explained
    • TheAIEdge
    • Matt Wolfe AI
    • The TechLead
    • Andrew Ng
    • OpenAI
  • Expert Blogs
    • François Chollet
    • Gary Marcus
    • IBM
    • Jack Clark
    • Jeremy Howard
    • Melanie Mitchell
    • Andrew Ng
    • Andrej Karpathy
    • Sebastian Ruder
    • Rachel Thomas
    • IBM
  • AI Policy & Ethics
    • ACLU AI
    • AI Now Institute
    • Center for AI Safety
    • EFF AI
    • European Commission AI
    • Partnership on AI
    • Stanford HAI Policy
    • Mozilla Foundation AI
    • Future of Life Institute
    • Center for AI Safety
    • World Economic Forum AI
  • AI Tools & Product Releases
    • AI Assistants
    • AI for Recruitment
    • AI Search
    • Coding Assistants
    • Customer Service AI
    • Image Generation
    • Video Generation
    • Writing Tools
    • AI for Recruitment
    • Voice/Audio Generation
  • Industry Applications
    • Finance AI
    • Healthcare AI
    • Legal AI
    • Manufacturing AI
    • Media & Entertainment
    • Transportation AI
    • Education AI
    • Retail AI
    • Agriculture AI
    • Energy AI
  • AI Art & Entertainment
    • AI Art News Blog
    • Artvy Blog » AI Art Blog
    • Weird Wonderful AI Art Blog
    • The Chainsaw » AI Art
    • Artvy Blog » AI Art Blog
What's Hot

ServiceNow, Moveworks deal speaks to M&A opportunity in tech

C3 AI Stock Is Soaring Today: Here’s Why – C3.ai (NYSE:AI)

Trump’s Tech Sanctions To Empower China, Betray America

Facebook X (Twitter) Instagram
Advanced AI News
  • Home
  • AI Models
    • Adobe Sensi
    • Aleph Alpha
    • Alibaba Cloud (Qwen)
    • Amazon AWS AI
    • Anthropic (Claude)
    • Apple Core ML
    • Baidu (ERNIE)
    • ByteDance Doubao
    • C3 AI
    • Cohere
    • DataRobot
    • DeepSeek
  • AI Research & Breakthroughs
    • Allen Institue for AI
    • arXiv AI
    • Berkeley AI Research
    • CMU AI
    • Google Research
    • Meta AI Research
    • Microsoft Research
    • OpenAI Research
    • Stanford HAI
    • MIT CSAIL
    • Harvard AI
  • AI Funding & Startups
    • AI Funding Database
    • CBInsights AI
    • Crunchbase AI
    • Data Robot Blog
    • TechCrunch AI
    • VentureBeat AI
    • The Information AI
    • Sifted AI
    • WIRED AI
    • Fortune AI
    • PitchBook
    • TechRepublic
    • SiliconANGLE – Big Data
    • MIT News
    • Data Robot Blog
  • Expert Insights & Videos
    • Google DeepMind
    • Lex Fridman
    • Meta AI Llama
    • Yannic Kilcher
    • Two Minute Papers
    • AI Explained
    • TheAIEdge
    • Matt Wolfe AI
    • The TechLead
    • Andrew Ng
    • OpenAI
  • Expert Blogs
    • François Chollet
    • Gary Marcus
    • IBM
    • Jack Clark
    • Jeremy Howard
    • Melanie Mitchell
    • Andrew Ng
    • Andrej Karpathy
    • Sebastian Ruder
    • Rachel Thomas
    • IBM
  • AI Policy & Ethics
    • ACLU AI
    • AI Now Institute
    • Center for AI Safety
    • EFF AI
    • European Commission AI
    • Partnership on AI
    • Stanford HAI Policy
    • Mozilla Foundation AI
    • Future of Life Institute
    • Center for AI Safety
    • World Economic Forum AI
  • AI Tools & Product Releases
    • AI Assistants
    • AI for Recruitment
    • AI Search
    • Coding Assistants
    • Customer Service AI
    • Image Generation
    • Video Generation
    • Writing Tools
    • AI for Recruitment
    • Voice/Audio Generation
  • Industry Applications
    • Education AI
    • Energy AI
    • Finance AI
    • Healthcare AI
    • Legal AI
    • Media & Entertainment
    • Transportation AI
    • Manufacturing AI
    • Retail AI
    • Agriculture AI
  • AI Art & Entertainment
    • AI Art News Blog
    • Artvy Blog » AI Art Blog
    • Weird Wonderful AI Art Blog
    • The Chainsaw » AI Art
    • Artvy Blog » AI Art Blog
Advanced AI News
Home » Stream ingest data from Kafka to Amazon Bedrock Knowledge Bases using custom connectors
Amazon AWS AI

Stream ingest data from Kafka to Amazon Bedrock Knowledge Bases using custom connectors

Advanced AI BotBy Advanced AI BotApril 18, 2025No Comments12 Mins Read
Share Facebook Twitter Pinterest Copy Link Telegram LinkedIn Tumblr Email
Share
Facebook Twitter LinkedIn Pinterest Email


Retrieval Augmented Generation (RAG) enhances AI responses by combining the generative AI model’s capabilities with information from external data sources, rather than relying solely on the model’s built-in knowledge. In this post, we showcase the custom data connector capability in Amazon Bedrock Knowledge Bases that makes it straightforward to build RAG workflows with custom input data. Through this capability, Amazon Bedrock Knowledge Bases supports the ingestion of streaming data, which means developers can add, update, or delete data in their knowledge base through direct API calls.

Think of the examples of clickstream data, credit card swipes, Internet of Things (IoT) sensor data, log analysis and commodity prices—where both current data and historical trends are important to make a learned decision. Previously, to feed such critical data inputs, you had to first stage it in a supported data source and then either initiate or schedule a data sync job. Based on the quality and quantity of the data, the time to complete this process varied. With custom data connectors, you can quickly ingest specific documents from custom data sources without requiring a full sync and ingest streaming data without the need for intermediary storage. By avoiding time-consuming full syncs and storage steps, you gain faster access to data, reduced latency, and improved application performance.

However, with streaming ingestion using custom connectors, Amazon Bedrock Knowledge Bases processes such streaming data without using an intermediary data source, making it available almost immediately. This feature chunks and converts input data into embeddings using your chosen Amazon Bedrock model and stores everything in the backend vector database. This automation applies to both newly created and existing databases, streamlining your workflow so you can focus on building AI applications without worrying about orchestrating data chunking, embeddings generation, or vector store provisioning and indexing. Additionally, this feature provides the ability to ingest specific documents from custom data sources, all while reducing latency and alleviating operational costs for intermediary storage.

Amazon Bedrock

Amazon Bedrock is a fully managed service that offers a choice of high-performing foundation models (FMs) from leading AI companies such as Anthropic, Cohere, Meta, Stability AI, and Amazon through a single API, along with a broad set of capabilities you need to build generative AI applications with security, privacy, and responsible AI. Using Amazon Bedrock, you can experiment with and evaluate top FMs for your use case, privately customize them with your data using techniques such as fine-tuning and RAG, and build agents that execute tasks using your enterprise systems and data sources.

Amazon Bedrock Knowledge Bases

Amazon Bedrock Knowledge Bases allows organizations to build fully managed RAG pipelines by augmenting contextual information from private data sources to deliver more relevant, accurate, and customized responses. With Amazon Bedrock Knowledge Bases, you can build applications that are enriched by the context that is received from querying a knowledge base. It enables a faster time to product release by abstracting from the heavy lifting of building pipelines and providing you an out-of-the-box RAG solution, thus reducing the build time for your application.

Amazon Bedrock Knowledge Bases custom connector

Amazon Bedrock Knowledge Bases supports custom connectors and the ingestion of streaming data, which means you can add, update, or delete data in your knowledge base through direct API calls.

Solution overview: Build a generative AI stock price analyzer with RAG

For this post, we implement a RAG architecture with Amazon Bedrock Knowledge Bases using a custom connector and topics built with Amazon Managed Streaming for Apache Kafka (Amazon MSK) for a user who may be interested to understand stock price trends. Amazon MSK is a streaming data service that manages Apache Kafka infrastructure and operations, making it straightforward to run Apache Kafka applications on Amazon Web Services (AWS). The solution enables real-time analysis of customer feedback through vector embeddings and large language models (LLMs).

The following architecture diagram has two components:

Preprocessing streaming data workflow noted in letters on the top of the diagram:

Mimicking streaming input, upload a .csv file with stock price data into MSK topic
Automatically trigger the consumer AWS Lambda function
Ingest consumed data into a knowledge base
Knowledge base internally using embeddings model transforms into vector index
Knowledge base internally storing vector index into the vector database

Runtime execution during user queries noted in numerals at the bottom of the diagram:

Users query on stock prices
Foundation model uses the knowledge base to search for an answer
The knowledge base returns with relevant documents
User answered with relevant answer

solution overview

Implementation design

The implementation follows these high-level steps:

Data source setup – Configure an MSK topic that streams input stock prices
Amazon Bedrock Knowledge Bases setup – Create a knowledge base in Amazon Bedrock using the quick create a new vector store option, which automatically provisions and sets up the vector store
Data consumption and ingestion – As and when data lands in the MSK topic, trigger a Lambda function that extracts stock indices, prices, and timestamp information and feeds into the custom connector for Amazon Bedrock Knowledge Bases
Test the knowledge base – Evaluate customer feedback analysis using the knowledge base

Solution walkthrough

To build a generative AI stock analysis tool with Amazon Bedrock Knowledge Bases custom connector, use instructions in the following sections.

Configure the architecture

To try this architecture, deploy the AWS CloudFormation template from this GitHub repository in your AWS account. This template deploys the following components:

Functional virtual private clouds (VPCs), subnets, security groups and AWS Identity and Access Management (IAM) roles
An MSK cluster hosting Apache Kafka input topic
A Lambda function to consume Apache Kafka topic data
An Amazon SageMaker Studio notebook for granular setup and enablement

Create an Apache Kafka topic

In the precreated MSK cluster, the required brokers are deployed ready for use. The next step is to use a SageMaker Studio terminal instance to connect to the MSK cluster and create the test stream topic. In this step, you follow the detailed instructions that are mentioned at Create a topic in the Amazon MSK cluster. The following are the general steps involved:

Download and install the latest Apache Kafka client
Connect to the MSK cluster broker instance
Create the test stream topic on the broker instance

Create a knowledge base in Amazon Bedrock

To create a knowledge base in Amazon Bedrock, follow these steps:

On the Amazon Bedrock console, in the left navigation page under Builder tools, choose Knowledge Bases.

amazon bedrock knowledge bases console

To initiate knowledge base creation, on the Create dropdown menu, choose Knowledge Base with vector store, as shown in the following screenshot.

amazon bedrock knowledge bases create

In the Provide Knowledge Base details pane, enter BedrockStreamIngestKnowledgeBase as the Knowledge Base name.
Under IAM permissions, choose the default option, Create and use a new service role, and (optional) provide a Service role name, as shown in the following screenshot.

amazon bedrock knowledge bases create details

On the Choose data source pane, select Custom as the data source where your dataset is stored
Choose Next, as shown in the following screenshot

amazon bedrock knowledge bases data source details

On the Configure data source pane, enter BedrockStreamIngestKBCustomDS as the Data source name.
Under Parsing strategy, select Amazon Bedrock default parser and for Chunking strategy, choose Default chunking. Choose Next, as shown in the following screenshot.

amazon bedrock knowledge bases parsing strategy

On the Select embeddings model and configure vector store pane, for Embeddings model, choose Titan Text Embeddings v2. For Embeddings type, choose Floating-point vector embeddings. For Vector dimensions, select 1024, as shown in the following screenshot. Make sure you have requested and received access to the chosen FM in Amazon Bedrock. To learn more, refer to Add or remove access to Amazon Bedrock foundation models.

amazon bedrock knowledge bases embedding model

On the Vector database pane, select Quick create a new vector store and choose the new Amazon OpenSearch Serverless option as the vector store.

amazon bedrock knowledge bases vector data store

On the next screen, review your selections. To finalize the setup, choose Create.
Within a few minutes, the console will display your newly created knowledge base.

Configure AWS Lambda Apache Kafka consumer

Now, using API calls, you configure the consumer Lambda function so it gets triggered as soon as the input Apache Kafka topic receives data.

Configure the manually created Amazon Bedrock Knowledge Base ID and its custom Data Source ID as environment variables within the Lambda function. When you use the sample notebook, the referred function names and IDs will be filled in automatically.

response = lambda_client.update_function_configuration(
FunctionName=,
Environment={
‘Variables’: {
‘KBID’: ,
‘DSID’:
}
}
)

When it’s completed, you tie the Lambda consumer function to listen for events in the source Apache Kafka topic:

response = lambda_client.create_event_source_mapping(
EventSourceArn=,
FunctionName=,
StartingPosition=’LATEST’,
Enabled=True,
Topics=[‘streamtopic’]
)

Review AWS Lambda Apache Kafka consumer

The Apache Kafka consumer Lambda function reads data from the Apache Kafka topic, decodes it, extracts stock price information, and ingests it into the Amazon Bedrock knowledge base using the custom connector.

Extract the knowledge base ID and the data source ID:

kb_id = os.environ[‘KBID’]
ds_id = os.environ[‘DSID’]

Define a Python function to decode input events:

def decode_payload(event_data):
agg_data_bytes = base64.b64decode(event_data)
decoded_data = agg_data_bytes.decode(encoding=”utf-8″)
event_payload = json.loads(decoded_data)
return event_payload

Decode and parse required data on the input event received from the Apache Kafka topic. Using them, create a payload to be ingested into the knowledge base:

records = event[‘records’][‘streamtopic-0’]
for rec in records:
# Each record has separate eventID, etc.
event_payload = decode_payload(rec[‘value’])
ticker = event_payload[‘ticker’]
price = event_payload[‘price’]
timestamp = event_payload[‘timestamp’]
myuuid = uuid.uuid4()
payload_ts = datetime.utcfromtimestamp(timestamp).strftime(‘%Y-%m-%d %H:%M:%S’)
payload_string = “At ” + payload_ts + ” the price of ” + ticker + ” is ” + str(price) + “.”

Ingest the payload into Amazon Bedrock Knowledge Bases using the custom connector:

response = bedrock_agent_client.ingest_knowledge_base_documents(
knowledgeBaseId = kb_id,
dataSourceId = ds_id,
documents= [
{
‘content’: {
‘custom’ : {
‘customDocumentIdentifier’: {
‘id’ : str(myuuid)
},
‘inlineContent’ : {
‘textContent’ : {
‘data’ : payload_string
},
‘type’ : ‘TEXT’
},
‘sourceType’ : ‘IN_LINE’
},
‘dataSourceType’ : ‘CUSTOM’
}
}
]
)

Testing

Now that the required setup is done, you trigger the workflow by ingesting test data into your Apache Kafka topic hosted with the MSK cluster. For best results, repeat this section by changing the .csv input file to show stock price increase or decrease.

Prepare the test data. In my case, I had the following data input as a .csv file with a header.

ticker
price

OOOO
$44.50

ZVZZT
$3,413.23

ZNTRX
$22.34

ZNRXX
$208.76

NTEST
$0.45

ZBZX
$36.23

ZEXIT
$942.34

ZIEXT
$870.23

ZTEST
$23.75

ZVV
$2,802.86

ZXIET
$63.00

ZAZZT
$18.86

ZBZZT
$998.26

ZCZZT
$72.34

ZVZZC
$90.32

ZWZZT
$698.24

ZXZZT
$932.32

Define a Python function to put data to the topic. Use pykafka client to ingest data:

def put_to_topic(kafka_host, topic_name, ticker, amount, timestamp):
client = KafkaClient(hosts = kafka_host)
topic = client.topics[topic_name]
payload = {
‘ticker’: ticker,
‘price’: amount,
‘timestamp’: timestamp
}
ret_status = True
data = json.dumps(payload)
encoded_message = data.encode(“utf-8”)
print(f’Sending ticker data: {ticker}…’)
with topic.get_sync_producer() as producer:
result=producer.produce(encoded_message)
return ret_status

Read the .csv file and push the records to the topic:

df = pd.read_csv(‘TestData.csv’)
start_test_time = time.time()
print(datetime.utcfromtimestamp(start_test_time).strftime(‘%Y-%m-%d %H:%M:%S’))
df = df.reset_index()
for index, row in df.iterrows():
put_to_topic(BootstrapBrokerString, KafkaTopic, row[‘ticker’], row[‘price’], time.time())
end_test_time = time.time()
print(datetime.utcfromtimestamp(end_test_time).strftime(‘%Y-%m-%d %H:%M:%S’))

Verification

If the data ingestion and subsequent processing is successful, navigate to the Amazon Bedrock Knowledge Bases data source page to check the uploaded information.

amazon bedrock knowledge bases upload verification

Querying the knowledge base

Within the Amazon Bedrock Knowledge Bases console, you have access to query the ingested data immediately, as shown in the following screenshot.

amazon bedrock knowledge bases test

To do that, select an Amazon Bedrock FM that you have access to. In my case, I chose Amazon Nova Lite 1.0, as shown in the following screenshot.

amazon bedrock knowledge bases choose llm

When it’s completed, the question, “How is ZVZZT trending?”, yields the results based on the ingested data. Note how Amazon Bedrock Knowledge Bases shows how it derived the answer, even pointing to the granular data element from its source.

bedrock console knowledge bases results

Cleanup

To make sure you’re not paying for resources, delete and clean up the resources created.

Delete the Amazon Bedrock knowledge base.
Delete the automatically created Amazon OpenSearch Serverless cluster.
Delete the automatically created Amazon Elastic File System (Amazon EFS) shares backing the SageMaker Studio environment.
Delete the automatically created security groups associated with the Amazon EFS share. You might need to remove the inbound and outbound rules before they can be deleted.
Delete the automatically created elastic network interfaces attached to the Amazon MSK security group for Lambda traffic.
Delete the automatically created Amazon Bedrock Knowledge Bases execution IAM role.
Stop the kernel instances with Amazon SageMaker Studio.
Delete the CloudFormation stack.

Conclusion

In this post, we showed you how Amazon Bedrock Knowledge Bases supports custom connectors and the ingestion of streaming data, through which developers can add, update, or delete data in their knowledge base through direct API calls. Amazon Bedrock Knowledge Bases offers fully managed, end-to-end RAG workflows to create highly accurate, low-latency, secure, and custom generative AI applications by incorporating contextual information from your company’s data sources. With this capability, you can quickly ingest specific documents from custom data sources without requiring a full sync, and ingest streaming data without the need for intermediary storage.

Send feedback to AWS re:Post for Amazon Bedrock or through your usual AWS contacts, and engage with the generative AI builder community at community.aws.

About the Author

author-image Prabhakar Chandrasekaran is a Senior Technical Account Manager with AWS Enterprise Support. Prabhakar enjoys helping customers build cutting-edge AI/ML solutions on the cloud. He also works with enterprise customers providing proactive guidance and operational assistance, helping them improve the value of their solutions when using AWS. Prabhakar holds eight AWS and seven other professional certifications. With over 22 years of professional experience, Prabhakar was a data engineer and a program leader in the financial services space prior to joining AWS.



Source link

Follow on Google News Follow on Flipboard
Share. Facebook Twitter Pinterest LinkedIn Tumblr Email Copy Link
Previous ArticleJames Cameron Wants to Use AI to ‘Cut the Cost’ of Making Films
Next Article Paper page – Exploring Expert Failures Improves LLM Agent Tuning
Advanced AI Bot
  • Website

Related Posts

Build a Text-to-SQL solution for data consistency in generative AI using Amazon Nova

June 7, 2025

Implement semantic video search using open source large vision models on Amazon SageMaker and Amazon OpenSearch Serverless

June 6, 2025

Build a serverless audio summarization solution with Amazon Bedrock and Whisper

June 6, 2025
Leave A Reply Cancel Reply

Latest Posts

Jiaxing Train Station By Architect Ma Yansong Is A Model Of People-Centric, Green Urban Design

Hugh Jackman And Sonia Friedman Boldly Bid To Democratize Theater

Men’s Swimwear Gets Casual At Miami Swim Week 2025

Original Prototype for Jane Birkin’s Hermes Bag Consigned to Sotheby’s

Latest Posts

ServiceNow, Moveworks deal speaks to M&A opportunity in tech

June 7, 2025

C3 AI Stock Is Soaring Today: Here’s Why – C3.ai (NYSE:AI)

June 7, 2025

Trump’s Tech Sanctions To Empower China, Betray America

June 7, 2025

Subscribe to News

Subscribe to our newsletter and never miss our latest news

Subscribe my Newsletter for New Posts & tips Let's stay updated!

Welcome to Advanced AI News—your ultimate destination for the latest advancements, insights, and breakthroughs in artificial intelligence.

At Advanced AI News, we are passionate about keeping you informed on the cutting edge of AI technology, from groundbreaking research to emerging startups, expert insights, and real-world applications. Our mission is to deliver high-quality, up-to-date, and insightful content that empowers AI enthusiasts, professionals, and businesses to stay ahead in this fast-evolving field.

Subscribe to Updates

Subscribe to our newsletter and never miss our latest news

Subscribe my Newsletter for New Posts & tips Let's stay updated!

YouTube LinkedIn
  • Home
  • About Us
  • Advertise With Us
  • Contact Us
  • DMCA
  • Privacy Policy
  • Terms & Conditions
© 2025 advancedainews. Designed by advancedainews.

Type above and press Enter to search. Press Esc to cancel.