This is the third post in a series I’m writing about a new Minimal Viable Product we’ve released at Voxgig that turns your podcast into a chatbot. Your visitors can now ask your guests questions directly!
The first post is here: Building a Podcast Chatbot for Voxgig and you find the full list at the end of this post.
We want to ingest a podcast. The podcast episodes are described in an RSS feed that also contains the metadata about the podcast. We send a message to the microservices system describing what we want to happen:
{
aim: 'ingest', // The "aim" of the message is the `ingest` service.
subscribe: 'podcast', // The instruction to the `ingest` service.
feed: 'https://feeds.resonaterecordings.com/voxgig-fireside-podcast'
}
The system routes this message to our implementation code (we’ll come back to how that happens later in this series). Since this is a TypeScript code base, our implementation is a TypeScript function:
async function subscribe_podcast(this: any, msg: any, meta: any) {
let out: any = { ok: false }
out.why = 'no-code-yet'
return out
}
As a convention, our microservices accept messages that are JSON documents and also respond with messages that are JSON documents. There may not be a response (if the message is an “event”), but if there is, we use the property ok: boolean
to indicate the success or failure of the message, and use the why: string
property to provide an error reason to the sender.
Why use this convention? You can’t throw exceptions over the network (easily or naturally). But you also don’t want to use exceptions for business logic failures. The system itself hasn’t failed, just some business logic process.
Our initial implementation does nothing except fail, but we can still test it by using the REPL:
boqr/pdm-local> aim:ingest,subscribe:podcast
{
ok: false,
why: 'no-code-yet'
}
Now we can start to write code to fill out the implementation, which will get hot-reloaded as we go, and we can keep using the REPL to test it. This is what they call Developer Experience folks.
Let’s pause for a minute before writing any more code. We want our system to handle more than one podcast, and we know that we will need to process each podcast episode separately (download the audio, transcribe it, create a vector “embedding” etc.). So in this message action, all we should do is download the RSS, and then send another message to start the actual ingestion process. That way we separate obtaining the podcast RSS feed, from operating on that feed. This will make development and testing easier because once we have the feed, we can run ingestion multiple times without downloading the RSS each time. And trust me, when you build an AI chatbot, you need to rerun your pipeline a lot.
Here is the basic functionality:
import type { RSS } from './ingest-types'
async function subscribe_podcast(this: any, msg: any, meta: any) {
// The current seneca instance.
const seneca = this
let out: any = { ok: false }
// RSS URL
let feed = out.feed = '' + msg.feed
// Processing controls
let doUpdate = out.doUpdate = !!msg.doUpdate
let doIngest = out.doIngest = !!msg.doIngest
// Load the podcast by feed URL too see if we are already subscribed
let podcastEnt = await seneca.entity('pdm/podcast').load$({ feed })
// Download the RSS feed if new or updating
if (null == podcastEnt || doUpdate) {
let rssRes = await seneca.shared.getRSS(feed)
let rss = rssRes.rss as RSS
podcastEnt = podcastEnt || seneca.entity('pdm/podcast').make$()
podcastEnt = await podcastEnt.save$({
feed,
title: rss.title,
desc: rss.description,
})
}
if (null != podcastEnt) {
out.ok = true
out.podcast = podcastEnt.data$(false)
if (doIngest) {
await seneca.post('aim:ingest,ingest:podcast', {
podcast_id: podcastEnt.id,
doUpdate,
doIngest,
})
}
}
else {
out.why = 'podcast-not-found'
}
return out
}
Typescript types: so there’s a lot of
any
types in this code. There will be a future refactoring to improve this situation. For now, remember that network messages are not function calls, and the messages are validated in other ways.
Yoda conditions: (
null == foo
) Safer, this is, young Padawan
I’ve removed most of the debugging, tracing, and control code, but what you see above is the real implementation. Let’s unpack what it does.
This message action expects a message that gives us the feed URL in the feed
property. But it also looks for optional doUpdate
and doIngest
boolean properties. These are used to control how far we progress along the ingestion pipeline.
The doUpdate
property must be true
to download the RSS feed and “update” the podcast.
The doIngest
property must be true
to send the ingestion message to start ingesting individual episodes.
You can use these properties in the REPL to switch off parts of the pipeline to concentrate on validating and developing the parts you want to work on.
Note that we also add these properties to the out
variable and send them back with the response. This makes debugging easier, especially when looking at logs.
New or Existing Podcast?
The first real work happens when we try to load the podcast from our database using the feed URL. If it already exists, we use the existing database row. The Seneca framework has a fundamental design principle: everything is a message. That includes database access (or is there a database at all? Who knows…).
As a convenience, database messages are wrapped in a traditional Object Relational Mapper interface. Seneca also supports name-spacing data entities. I’ll explain more about Seneca’s persistence system as we go, so you’ll have to take things a little on faith at the start.
Let’s try to load a podcast from the database:
let podcastEnt = await seneca.entity('pdm/podcast').load$({ feed })
This is an asynchronous operation (we have to wait for the database), hence we have to await
a promise. The entity
method creates a new entity object for us, loading data from whatever table or data source is mapped to the pdm/podcast
entity. The Seneca entity ORM provides standard methods that all end in $
to avoid clashing with your own database column names. The load$
method takes a query object and returns the first entity that matches the field values in the query.
In this system, the pdm
namespace is used for tables relating to our podcast chatbot business logic. We will also make use of Seneca plugins that provide standard functionality (such as user accounts) that use the sys
namespace. By using our own namespace, we ensure our own tables never conflict with any of the Seneca plugins we might want to use later. I did say this was production code. This is the sort of thing you have to worry about in production applications.
If the podcast does not exist in our system, or if doUpdate
is true
, then we proceed to download the RSS feed, set some fields on the pdm/podcast
entity, and save it to the database.
Cool. So now, if we do have a podcast (null != podcastEnt
), then we can proceed to ingest it if doIngest
is true
. We send a new message but do not expect a reply. We are emitting the message as an event. Locally, the message gets routed inside our single process monolith. When deployed to AWS, our message will get sent out over a Simple Queue Service topic to whoever is interested in it for further processing. Either way, we do have to wait for the message to be posted.
Notice that the mental model of “it’s all just JSON messages all the way down” means we don’t have to (in this code) think about message routing architectures, microservice design patterns, or what service SDK we need to use. We can concentrate on our own business logic.