Step parallelism
- If you’re using a serverless platform to host, code will run in true parallelism similar to multi-threading (without shared state)
- Each step will be individually retried
Platform support
Parallelism works across all providers and platforms. True parallelism is supported for serverless functions; if you’re using a single Express server you’ll be splitting all parallel jobs amongst a single-threaded node server.
Running steps in parallel
You can run steps in parallel via Promise.all()
:
- Create each step via
step.run()
without awaiting, which returns an unresolved promise. - Await all steps via
Promise.all()
. This triggers all steps to run in parallel via separate executions.
A common use case is to split work into chunks:
import { Inngest } from "inngest";
const inngest = new Inngest({ id: "signup-flow" });
export const fn = inngest.createFunction(
{ id: "post-payment-flow" },
{ event: "stripe/charge.created" },
async ({ event, step }) => {
// These steps are not `awaited` and run in parallel when Promise.all
// is invoked.
const sendEmail = step.run("confirmation-email", async () => {
const emailID = await sendEmail(event.data.email);
return emailID;
});
const updateUser = step.run("update-user", async () => {
return db.updateUserWithCharge(event);
});
// Run both steps in parallel. Once complete, Promise.all will return all
// parallelized state here.
//
// This ensures that all steps complete as fast as possible, and we still have
// access to each step's data once they're compelte.
const [emailID, updates] = await Promise.all([sendEmail, updateUser]);
return { emailID, updates };
}
);
When each step is finished, Inngest will aggregate each step's state and re-invoke the function with all state available.
Step parallelism in Python
Inngest supports parallel steps regardless of whether you're using asynchronous or synchronous code. For both approaches, you can use step.parallel
:
async - with inngest.Step
and await step.parallel()
@client.create_function(
fn_id="my-fn",
trigger=inngest.TriggerEvent(event="my-event"),
)
async def fn(
ctx: inngest.Context,
step: inngest.Step,
) -> None:
user_id = ctx.event.data["user_id"]
(updated_user, sent_email) = await step.parallel(
(
lambda: step.run("update-user", update_user, user_id),
lambda: step.run("send-email", send_email, user_id),
)
)
sync - with inngest.StepSync
and step.parallel()
@client.create_function(
fn_id="my-fn",
trigger=inngest.TriggerEvent(event="my-event"),
)
def fn(
ctx: inngest.Context,
step: inngest.StepSync,
) -> None:
user_id = ctx.event.data["user_id"]
(updated_user, sent_email) = step.parallel(
(
lambda: step.run("update-user", update_user, user_id),
lambda: step.run("send-email", send_email, user_id),
)
)
At this time, Inngest does not have stable support for asyncio.gather
or asyncio.wait
. If you'd like to try out experimental support, use the _experimental_execution
option when creating your function:
@client.create_function(
fn_id="my-fn",
trigger=inngest.TriggerEvent(event="my-event"),
_experimental_execution=True,
)
def fn(
ctx: inngest.Context,
step: inngest.StepSync,
) -> None:
user_id = ctx.event.data["user_id"]
(updated_user, sent_email) = asyncio.gather(
asyncio.create_task(step.run("update-user", update_user, user_id)),
asyncio.create_task(step.run("send-email", send_email, user_id)),
)
When using asyncio.wait
, asyncio.FIRST_COMPLETED
is supported. However, asyncio.FIRST_EXCEPTION
is not supported due to the way Inngest interrupts the execution of the function.
Chunking jobs
A common use case is to chunk work. For example, when using OpenAI's APIs you might need to chunk a user's input and run the API on many chunks, then aggregate all data:
import { Inngest } from "inngest";
const inngest = new Inngest({ id: "signup-flow" });
export const fn = inngest.createFunction(
{ id: "summarize-text" },
{ event: "app/text.summarize" },
async ({ event, step }) => {
const chunks = splitTextIntoChunks(event.data.text);
const summaries = await Promise.all(
chunks.map((chunk) =>
step.run("summarize-chunk", () => summarizeChunk(chunk))
)
);
await step.run("summarize-summaries", () => summarizeSummaries(summaries));
}
);
This allows you to run many independent steps, wait until they're all finished, then fetch the results from all steps within a few lines of code. Doing this in a traditional system would require creating many jobs, polling the status of all jobs, and manually combining state.
Limitations
Currently, the total data returned from all steps must be under 4MB (eg. a single step can return a max of. 4MB, or 4 steps can return a max of 1MB each). Functions are also limited to a maximum of 1,000 steps.
Parallelism vs fan-out
Another technique similar to parallelism is fan-out (read the guide here): when one function sends events to trigger other functions. Here are the key differences:
- Both patterns run jobs in parallel
- You can access the output of steps ran in parallel within your function, whereas with fan-out you cannot
- Parallelism has a limit of 1,000 steps, though you can create as many functions as you'd like using fan-out
- You can replay events via fan-out, eg. to test functions locally
- You can retry individual functions easily if they permanently fail, whereas if a step permanently fails (after retrying) the function itself will fail and terminate.
- Fan-out splits functionality into different functions, using step functions keeps all related logic in a single, easy to read function