By making off-the-rack machine learning models accessible for anyone to use, cloud ML services like Amazon Personalize help make ML-driven customer experiences available to teams at any scale.
You no longer need in-house data science and machine learning experts to get the benefit of propensity scoring or product recommendations.
However, while models can be outsourced, your data can't. The effectiveness of machine learning insights will always be limited by the quality and completeness of the data they are based on. Cloud ML platforms (by themselves) leave three key challenges unsolved:
These are infrastructure challenges, and one way they can be overcome is with a Customer Data Platform (CDP). The goal of a CDP is to get customer data from wherever it is, organize it into a single view of the customer, and make that view available to all services that need it. Instead of thinking about machine learning as just another data silo, a CDP can help you build machine learning insights into your core data infrastructure by connecting ML-driven learnings to additional external services for activation.
Let's dig into how a CDP can help you solve each of the three infrastructure challenges.
To train an ML model, you need accurate data about user behavior, and lots of it. Data quality can be broken down into three components:
Just as the data that powers an ML model can come from any platform, the insights that machine learning models generate are most valuable when they can be used to power personalized experiences for your website, apps, brick-and-mortar stores, call centers, etc.
Without modern customer data infrastructure, making ML actionable is a huge challenge. For example: say you've used ML to generate churn risk scores for your customers:
Without the data connections provided by a CDP, making your ML scores available where they’re needed would require dedicated development work and additional cost.
Items4U ("The finest items, which you will particularly enjoy"), operates a retail business across it's website, native iOS and Android apps, and network of brick-and-mortar stores throughout the country. Our challenge is that the sheer number of items we offer can make the shopping experience on our apps feel a little scattershot. I want to use ML to figure out which products I should focus on surfacing for each user.
By the end of this project, I'll have set up a mechanism to deliver personalized product recommendations to each user, which will automatically continue to grow and improve over time. I'll be using Amazon Personalize and mParticle as my Customer Data Platform. At the end of the project I'll be using Amplitude to measure success.
At a high level, the data flow looks like this:
There’s a fair amount of work required to set up the AWS assets we need, but the good news is that most of it can be automated for subsequent iterations. For this reason, I’m using the AWS CLI and other scripting-friendly tools wherever possible. In this post, we’ll walk through how to:
To train an ML model to give product recommendations, I need data about how my customers interact with products. Fortunately, I don't have to start from scratch just for ML. Capturing commerce data is a core function of mParticle, and by the time a retail brand like Items4U is ready to explore ML, the required data is already being captured and used for more basic use cases, like app analytics, segmentation and re-targeting.
When ready to begin integrating ML with a CDP, I've already:
Set up inputs to collect data from the following channels: iOS, Android, Web, Custom Feed (Point of Sale), Custom Feed (Amazon Personalize)
Added mParticle's client-side SDKs to my iOS, Android and Web apps, and configured my point-of-sale platform to forward purchase events to mParticle using the NodeJS server-side SDK.
mParticle uses a single standard schema for capturing commerce events, and this schema is enforced by the SDKs. This means I don't have to rely on individual developers on each platform picking the right event names. To my ML model, a purchase made through the iOS app will look the same as a purchase made on the website, or in-store. For example, here's how I would log a purchase on my web app.
// 1. Create the product
var product = mParticle.eCommerce.createProduct(
'Skateboard', // Name
'prsx-10', // SKU
100.00, // Price
1 // Quantity
);
// 2. Summarize the transaction
var transactionAttributes = {
Id: 'some-transaction-id',
Revenue: 100,
Tax: 9
};
// 3. Log the purchase event;
mParticle.eCommerce.logProductAction(
mParticle.ProductActionType.Purchase,
[product],
null, //optional custom attributes would go here
null, //optional custom flags would go here
transactionAttributes);
What mParticle forwards to downstream services, like my ML model (stripped down to just the fields we care about), will look like this:
{
"mpid" 761556044463767215, // master user identity
"environment": "production",
"user_identities": {
"email": "user99@example.com"
},
"user_attributes": {
"$firstname": "Milo",
"$lastname": "Minderbinder"
},
"events": [{
"data": {
"product_action": {
"action": "view_detail", // Others actions are "add_to_cart", "remove_from_cart", and "purchase"
"products": [{
"id": "prsx-10", // Product SKU
"price": 100
}]
},
"timestamp": 1604695231872
},
"event_type": "commerce_event"
}]
}
Ideally, my product interaction data is linked to a customer ID that works on my website, on my mobile apps and in-store. Here, that's the mParticle ID (MPID). mParticle's identity resolution allows me to gradually build up identities for each channel and resolve those identities to a single MPID.
For example: when a customer visits the website for the first time, I can link a cookie ID to the MPID. If the customer creates an account, I can add an email address, and perhaps a phone number. If they make an online purchase, I can add a secure hash of their credit card number. This means that if the same person then makes a purchase in a physical store with the same credit card, I can attribute that purchase to the same customer profile.
This process lets me train my ML models based on a complete set of customer interactions.
For this use case I need to bring together mParticle and four AWS services:
These services can be configured in the AWS UI, but I'll be using Amazon's CLI tool. This way, I can reuse my work by creating a script to quickly spin up future iterations. I've followed Amazon's documentation to create an IAM user with access to the above four systems and log in to the console.
As I go, I’ll need to save the Amazon Resource Number (ARN) for each asset I create. I’ll need these ARNs to set up interactions between the different resources I create.
Kinesis is a tool for processing streaming data. mParticle will forward commerce event data to Kinesis, where they will be picked up by the Lambda function I'll set up later.
1. Create the stream
aws kinesis create-stream \
--stream-name Items4UCommerceEventStream \
--shard-count 1
Save the
StreamARN
from the response.2. Create a role for mParticle to assume
For mParticle to be able to upload to the Kinesis stream, I need to create an IAM role for mParticle to assume. This role needs a policy allowing PutRecord access to Kinesis (sample), and a trust policy (sample) allowing mParticle to assume the role.
aws iam create-role --role-name mparticle-kinesis-role --assume-role-policy-document file:///path/to/mp-trust-policy.json
aws iam put-role-policy --role-name mparticle-kinesis-role --policy-name mp-kinesis-put --policy-document file:///path/to/mp-kinesis-role.json
3. Connect mParticle to Kinesis.
mParticle offers an "event" output for streaming event data to Kinesis. This can be set up and controlled from the mParticle dashboard. You can read an overview of event outputs in the mParticle docs.
Create Configuration
First, I need to create an overall configuration for Kinesis. This holds all the settings that will remain the same for every input I connect. Each mParticle integration requires different settings. For example, API keys are commonly required. For Kinesis, I've already granted mParticle write access using IAM, so I only need to provide my AWS account number here.
Connect All Sources
Now I need to connect each of my four inputs: iOS, Android, Web and POS, to Kinesis.
Set Filters
mParticle lets me switch each individual event name on or off for a particular output, like Kinesis. These help me ensure that I'm only sending to Kinesis the data that I need to train my ML model. I'm interested in 4 types of commerce events:
In my filter settings, I leave these four events on, and turn everything else off.
Now, I'm streaming events from mParticle to Kinesis, hurrah! But Kinesis is only a staging area. From here, I need to load them into an Amazon Personalize Dataset Group.
A Dataset Group is an overall container for a set of user data that can be used to train an ML model.
aws personalize create-dataset-group --name Items4UCommerceEvents
Save the
datasetGroupArn
from the response.A Dataset Group can include up to three datasets:
Only the Interactions dataset is required, so to keep things simple it's the only one I'll use. I can come back later and improve future iterations of my model by adding other datasets.
Before I can create the dataset, I need a schema. For this example, I use the following elements:
User ID
- this will be the mParticle IDSession ID
- mParticle automatically creates a unique ID for each session, which I can use.Item ID
- this will be the SKU of the productEvent Type
- this will be the type of product interaction: Add to Cart, Add to Wishlist, Purchase, or View Detail.Timestamp
- time of the interaction. mParticle automatically records a timestamp for each interaction.As a Personalize JSON schema, it looks like this:
{
"type": "record",
"name": "Interactions",
"namespace": "com.amazonaws.personalize.schema",
"fields": [
{
"name": "USER_ID",
"type": "string"
},
{
"name": "SESSION_ID",
"type": "string"
},
{
"name": "ITEM_ID",
"type": "string"
},
{
"name": "EVENT_TYPE",
"type": "string"
},
{
"name": "TIMESTAMP",
"type": "long"
}
],
"version": "1.0"
}
1. Create the schema:
aws personalize create-schema \
--name Items4UCommerceEventSchema \
--schema file:///path/to/items4u-commerce-event-schema.json
Save the
schemaArn
from the response.2. Create the dataset:
aws personalize create-dataset \
--name Items4UCommerceEventsDataset \
--schema-arn {{saved schema arn}} \
--dataset-group-arn {{saved dataset group arn}} \
--dataset-type Interactions
Save the
datasetArn
from the response.3. Create the tracker:
A tracker is an ID linked to the dataset that lets me upload events.
aws personalize create-event-tracker \
--name Items4UCommerceEventTracker \
--dataset-group-arn {{saved dataset group arn}}
Save the
trackingID
from the response.In order to train a Machine Learning solution, I need at least 1000 records in my dataset. One way to do this is to upload CSVs of historical events. mParticle integrates with several Data Warehouses, including Amazon Redshift. If I have access, I can easily create a training set from my past data. The CSV would look something like this:
USER_ID,EVENT_TYPE,ITEM_ID,SESSION_ID,TIMESTAMP
761556044463767215,view_detail,prsx-23,Q8bQC4gnO8J7ewB,1595492950
-6907502341961927698,purchase,prsx-14,VA9AUJBhoJXAKr7,1595492945
However, training the model on historical data is not strictly required, and since data warehouse access is often tightly controlled, this step can be a huge bottleneck in attempts to implement ML.
An alternative way to train the model is simply to start forwarding real-time event data as it comes in. To do this I need to set up my Lambda function.
Eventually, the function will perform three tasks every time a new event is received at my Kinesis stream:
However, since I can't create a Personalize Campaign until I can train a Solution, this first version of the Lambda performs only the first task, while I collect the minimum 1000 events.
Lambdas can use several different languages and runtimes. I'll use Node for mine. The first version looks like this:
// Import Dependencies
const AWS = require('aws-sdk');
const JSONBig = require('json-bigint')({storeAsString: true}); // needed to parse 64-bit integer MPID
// Define the product actions we want to report to Personalize
const report_actions = ["purchase", "view_detail", "add_to_cart", "add_to_wishlist"];
// Initialize Personalize
const personalizeevents = new AWS.PersonalizeEvents({apiVersion: '2018-03-22'});
exports.handler = (event, context) => {
for (const record of event.Records) {
// Parse encoded payload
const payload = JSONBig.parse(Buffer.from(record.kinesis.data, 'base64').toString('ascii'));
// Extract required params
const events = payload.events;
const mpid = payload.mpid;
const sessionId = payload.message_id;
const params = {
sessionId: sessionId,
userId: mpid,
trackingId: process.env.TRACKING_ID
};
// Get interactions from events array
const eventList = [];
for (const e of events) {
if (e.event_type === "commerce_event" && report_actions.indexOf(e.data.product_action.action) >= 0) {
const timestamp = Math.floor(e.data.timestamp_unixtime_ms / 1000);
const action = e.data.product_action.action;
const event_id = e.data.event_id;
for (const product of e.data.product_action.products) {
const obj = {
itemId: product.id,
};
eventList.push({
properties: obj,
sentAt: timestamp,
eventId: event_id,
eventType: action
});
}
}
}
if (eventList.length > 0) {
params.eventList = eventList;
// Upload interactions to tracker
personalizeevents.putEvents(params, function(err) {
if (err) console.log(err, err.stack);
else console.log(`Uploaded ${eventList.length} events`)
});
}
}
};
1. Create the IAM role:
As before I need to create an IAM role to grant my Lambda function the permissions it needs to access Kinesis and Personalize.
The necessary trust policy can be found here.
aws iam create-role \
--role-name items4u-lambda-personalize-role \
--assume-role-policy-document file:///path/to/lambda-trust-policy.json
Save the
Role.Arn
from the response.I can use off-the-rack managed policies to grant access to Kinesis and Personalize:
aws iam attach-role-policy \
--role-name items4u-lambda-personalize-role \
--policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
aws iam attach-role-policy \
--role-name items4u-lambda-personalize-role \
--policy-arn arn:aws:iam::aws:policy/service-role/AmazonPersonalizeFullAccess
2. Create the Lambda:
To create the Lambda I need a zip file including the function itself, as well as it's dependencies in the node_modules folder.
I'll also need the mParticle API credentials for the Custom Feed I created for Amazon Personalize, and supply these as environment variables for the Lambda, as well as the Dataset Tracker ID.
aws lambda create-function \
--function-name Items4UPersonalizeLambda \
--runtime nodejs12.x \
--zip-file fileb:///path/to/Items4UPersonalizeLambda.zip \
--role {{role arn}} \
--handler index.handler \
--environment Variables="{MP_KEY=SomeAccessKey,MP_SECRET=SomeAccessSecret,TRACKER_ID=SomeTrackerID}"
3. Create an event-source mapping:
Configure the Lambda to be triggered by new events received at the Kinesis stream.
aws lambda create-event-source-mapping \
--function-name Items4UPersonalizeLambda \
--event-source-arn {{Kinesis stream arn}} \
--starting-position LATEST
By now, every time a commerce event is collected across any of my app platforms, mParticle is forwarding it to Kinesis. From here, the Lambda uploads the event to my Personalize dataset.
Now I need to wait to get at least 1000 records loaded. This can take some time. In the meantime, I can check the logs in AWS Cloudwatch to make sure the Lambda function is being invoked as expected.
A Personalize Campaign requires three components:
1. Create a Solution:
For this example I'll use Amazon's 'User Personalization' recipe.
aws personalize create-solution \
--name Items4URecsSolution \
--dataset-group-arn {{dataset group ARN}} \
--recipe-arn arn:aws:personalize:::recipe/aws-user-personalization
Save the
solutionArn
from the response.2. Create a Solution Version:
aws personalize create-solution-version \
--solution-arn {{solution ARN}}
Save the
solutionVersionArn
from the response.The solution version takes some time to create. I can check in on its progress regularly with
describe-solution-version
until the response shows status
: ACTIVE
.aws personalize describe-solution-version \
--solution-version-arn {{solution version ARN}}
3. Create the Campaign:
aws personalize create-campaign \
--name Items4UProductRecsCampaign \
--solution-version-arn arn:aws:personalize:us-east-1:521255666488:solution/Items4URecsSolution/f58f24b6 \
--min-provisioned-tps 1
The final step is to update my Lambda function to request product recommendations from my new campaign, and send those recommendations back to mParticle. The updated Lambda looks like this:
// Import Dependencies
const AWS = require('aws-sdk');
const JSONBig = require('json-bigint')({storeAsString: true}); // needed to parse 64-bit integer MPID
const mParticle = require('mparticle');
// Define the product actions we want to report to Personalize
const report_actions = ["purchase", "view_detail", "add_to_cart", "add_to_wishlist"];
// Initialize Personalize and mParticle
const personalizeevents = new AWS.PersonalizeEvents({apiVersion: '2018-03-22'});
const personalizeruntime = new AWS.PersonalizeRuntime({apiVersion: '2018-05-22'});
const mp_api = new mParticle.EventsApi(new mParticle.Configuration(process.env.MP_KEY, process.env.MP_SECRET));
exports.handler = (event, context) => {
for (const record of event.Records) {
// Parse encoded payload
const payload = JSONBig.parse(Buffer.from(record.kinesis.data, 'base64').toString('ascii'));
// Extract required params
const events = payload.events;
const mpid = payload.mpid;
const sessionId = payload.message_id;
const params = {
sessionId: sessionId,
userId: mpid,
trackingId: process.env.TRACKING_ID
};
// Get interactions from events array
const eventList = [];
for (const e of events) {
if (e.event_type === "commerce_event" && report_actions.indexOf(e.data.product_action.action) >= 0) {
const timestamp = Math.floor(e.data.timestamp_unixtime_ms / 1000);
const action = e.data.product_action.action;
const event_id = e.data.event_id;
for (const product of e.data.product_action.products) {
const obj = {
itemId: product.id,
};
eventList.push({
properties: obj,
sentAt: timestamp,
eventId: event_id,
eventType: action
});
}
}
}
if (eventList.length > 0) {
params.eventList = eventList;
// Upload interactions to tracker
personalizeevents.putEvents(params, function(err, data) {
if (err) console.log(err, err.stack);
else {
// Request product recs
var params = {
campaignArn: process.env.CAMPAIGN_ARN,
numResults: '5',
userId: mpid
};
personalizeruntime.getRecommendations(params, function(err, data) {
if (err) console.log(err, err.stack);
else {
console.log(`Uploaded ${eventList.length} events`)
// Upload product recs to mParticle
const batch = new mParticle.Batch(mParticle.Batch.Environment.development);
batch.mpid = mpid;
const itemList = [];
for (const item of data.itemList) {
itemList.push(item.itemId);
}
batch.user_attributes = {};
batch.user_attributes.product_recs = itemList;
const event = new mParticle.AppEvent(mParticle.AppEvent.CustomEventType.other, 'AWS Recs Update', {
product_recs: itemList.join()
});
batch.addEvent(event);
console.log(JSON.stringify(batch));
const callback = function(error, data, response) {
if (error) {
console.error(error);
} else {
console.log('Product Recs updated successfully');
}
};
mp_api.uploadEvents(batch, callback);
}
});
}
});
}
}
};
As well as updating the code, I also need to add the
CAMPAIGN_ARN
environment variable.When I request recs from a Personalize campaign, I can specify the number of recommendations I want. Here, I'm going for a top 5 -- enough to populate a carousel or an email widget.
The payload uploaded to mParticle by the Lambda will look like this:
{
"environment": "development",
"mpid": "-6907502341961927698",
"user_attributes": {
"product_recs": [
"prsx-4",
"prsx-2",
"prsx-15",
"prsx-30",
"prsx-28"
]
},
"events": [
{
"data": {
"custom_event_type": "other",
"event_name": "AWS Recs Update",
"custom_attributes": {
"product_recs": "prsx-4,prsx-2,prsx-15,prsx-30,prsx-28"
}
},
"event_type": "custom_event"
}
]
}
This payload records the product recommendations in two ways:
I've now set up a Machine Learning system that can generate a set of product recommendations for every user and update them each time the user interacts with a product.
Unlike a model trained on a one-off CSV upload, mine will continue to get better over time as the results of successful and unsuccessful recommendations feed back into the model in a flywheel pattern.
By sending the recommendations back to my Customer Data Platform, I can use them for more granular analytics and customer experience use cases.
Once we've set up the infrastructure to generate, continuously refine, and activate ML insights, the final piece of the puzzle is to figure out what works and what doesn't.
For that, I need my Data Warehouse and my analytics platforms, such as Google Analytics or Amplitude. The commerce data I'm collecting with mParticle is already enough to help me identify general trends. For example, I can tell if the average lifetime value of users is increasing since I started applying ML insights.
To dig deeper, I need to understand which ML campaigns I'm deploying for each user, so that I can compare how successful they are. For example, I might want to compare results for my initial product recommendations recipe against results for a control group that sees a default set of products. Alternatively, if I go back and enrich my ML model with additional datasets, or try a different recipe altogether, I'll want to test the new campaign against the original to check that I've actually improved my outcomes.
We've already seen that maintaining a single complete customer profile helped me activate on ML insights across all platforms. The same benefits apply to analytics. By storing experiment and variant information on the user profile, mParticle automatically makes that data available to any analytics tools that you are forwarding customer data to.
I can use a service like Optimizely to set up my experiments, or I can set up a quick A/B test, just by tweaking my Lambda code a little. Below is a version of the Lambda I set up in part 2 of this post, modified to do a few extra tasks:
Changes from earlier are marked with comments.
const AWS = require('aws-sdk');
const JSONBig = require('json-bigint')({ storeAsString: true });
const mParticle = require('mparticle');
const trackingId = "bd973581-6505-46ae-9939-e0642a82b8b4";
const report_actions = ["purchase", "view_detail", "add_to_cart", "add_to_wishlist"];
const personalizeevents = new AWS.PersonalizeEvents({apiVersion: '2018-03-22'});
const personalizeruntime = new AWS.PersonalizeRuntime({apiVersion: '2018-05-22'});
const mp_api = new mParticle.EventsApi(new mParticle.Configuration(process.env.MP_KEY, process.env.MP_SECRET));
exports.handler = function (event, context) {
for (const record of event.Records) {
const payload = JSONBig.parse(Buffer.from(record.kinesis.data, 'base64').toString('ascii'));
const events = payload.events;
const mpid = payload.mpid;
const sessionId = payload.message_id;
const params = {
sessionId: sessionId,
userId: mpid,
trackingId: trackingId
};
// Check for variant and assign one if not already assigned
const variant_assigned = Boolean(payload.user_attributes.ml_variant);
const variant = variant_assigned ? payload.user_attributes.ml_variant : Math.random() > 0.5 ? "A" : "B";
const eventList = [];
for (const e of events) {
if (e.event_type === "commerce_event" && report_actions.indexOf(e.data.product_action.action) >= 0) {
const timestamp = Math.floor(e.data.timestamp_unixtime_ms / 1000);
const action = e.data.product_action.action;
const event_id = e.data.event_id;
for (const product of e.data.product_action.products) {
const obj = {itemId: product.id,};
eventList.push({
properties: obj,
sentAt: timestamp,
eventId: event_id,
eventType: action
});
}
}
}
if (eventList.length > 0) {
params.eventList = eventList;
personalizeevents.putEvents(params, function(err, data) {
if (err) console.log(err, err.stack);
else {
var params = {
// Select campaign based on variant
campaignArn: process.env[`CAMPAIGN_ARN_${variant}`],
numResults: '5',
userId: mpid
};
personalizeruntime.getRecommendations(params, function(err, data) {
if (err) console.log(err, err.stack);
else {
const batch = new mParticle.Batch(mParticle.Batch.Environment.development);
batch.mpid = mpid;
const itemList = [];
for (const item of data.itemList) {
itemList.push(item.itemId);
}
batch.user_attributes = {};
batch.user_attributes.product_recs = itemList;
// Record variant on mParticle user profile
if (!variant_assigned) {
batch.user_attributes.ml_variant = variant
}
const event = new mParticle.AppEvent(mParticle.AppEvent.CustomEventType.other, 'AWS Recs Update');
event.custom_attributes = {product_recs: itemList.join()};
batch.addEvent(event);
const mp_callback = function(error, data, response) {
if (error) {
console.error(error);
} else {
console.log('API called successfully.');
}
};
mp_api.uploadEvents(batch, mp_callback);
}
});
}
});
}
}
};
With variant information stored on the user profile and automatically passed to Amplitude, I can now group my users by variant in any of my Amplitude charts.
User journeys are not always linear. For example: a customer might look at a recommended product on my website and not buy it immediately, but pick it up later in a brick-and-mortar store, or when they next use the native app. If I'm running my experiments and analytics on a per-device basis, I'll miss that conversion.
Note that because both my analytics in Amplitude and the user bucketing for my A/B test is based on mParticle's master MPID, my A/B test is more complete than if I had bucketed per device. By using the mParticle ID, I can capture the full effect of my campaigns.
In Machine Learning, as in all data-centric tasks, the right infrastructure is key.
When you use a CDP like mParticle to center your data infrastructure around a single, cross-platform customer record, your ML campaigns will be faster to set up, and more effective.
By using a CDP, you can:
To learn more about this use case, you can watch me demo it live with AWS on twitch.tv here.
Previously published here.