Question
How to Process Only Truly New Gmail Messages Using Gmail API and Pub/Sub Notifications?
I'm developing an application using the Gmail API and Google Pub/Sub to process incoming emails. My goal is to only process truly new messages and avoid reprocessing messages that have been moved between labels (e.g., moving a message back into the inbox).
Problem: The current implementation still processes messages that are moved back into the inbox, resulting in duplicate processing. How can I ensure that only truly new messages are processed and avoid reprocessing messages that are moved back into the inbox?
Considerations:
- The Gmail API's historyTypes:
['messageAdded']
seems to include messages that have been moved between labels. - I am storing the last processed history ID and timestamp to filter messages.
- Should I track and store message IDs to avoid reprocessing, or is there a more efficient way to handle this with the Gmail API?
Current Setup:
- Pub/Sub Notification: I have a Pub/Sub topic that triggers my
process_notification
function whenever there's a new message added to the inbox. - History API: I'm using the Gmail History API to fetch message history based on the history ID.
Main Question:
How can I best process only truly new incoming messages and avoid reprocessing messages if they are moved back into the inbox(or any other label)?
Environment:
- Language: Python
- Framework: Flask
- Database: MongoDB
- Gmail API
Attempted Solution: I tried to filter messages by their internal timestamp, processing only messages with a timestamp greater than the last processed message. I've tried storing message ID's but this would cause a bloated application. Here's a simplified version of my code:
def start_watch():
credentials = google.oauth2.credentials.Credentials(**session['credentials'])
service = build('gmail', 'v1', credentials=credentials)
request = {
'labelIds': ['INBOX'],
'topicName': f"projects/{create_app().config['GOOGLE_CLOUD_PROJECT_ID']}/topics/{create_app().config['PUBSUB_TOPIC']}",
'labelFilterBehavior': 'INCLUDE',
'historyTypes': ['messageAdded']
}
response = service.users().watch(userId='me', body=request).execute()
mongo.db.users.update_one(
{'google_id': session['google_id']},
{'$set': {'historyId': response['historyId'], 'watch_state': True}}
)
def process_notification(data):
global last_processed_history_id
payload = base64.urlsafe_b64decode(data['message']['data'])
message_data = json.loads(payload)
history_id = message_data['historyId']
email_address = message_data['emailAddress']
current_time = time.time()
# Debounce logic
if email_address in last_processed_history_id:
last_id, last_time = last_processed_history_id[email_address]
if history_id == last_id and (current_time - last_time) < debounce_interval:
print(f"Skipping duplicate notification for history ID {history_id}")
return
user = mongo.db.users.find_one({'email': email_address})
if user:
credentials = google.oauth2.credentials.Credentials(**user['credentials'])
try:
# Refresh credentials if necessary
if credentials.expired and credentials.refresh_token:
credentials.refresh(Request())
mongo.db.users.update_one(
{'email': email_address},
{'$set': {'credentials': credentials_to_dict(credentials)}}
)
elif credentials.expired and not credentials.refresh_token:
print("No refresh token available. User needs to re-authenticate.")
return redirect(url_for('auth.reauthenticate'))
service = build('gmail', 'v1', credentials=credentials)
# Check if the sender is on the block list for the user
blocked_senders = mongo.db.block_list.find({'receiver_id': user['_id']})
blocked_emails = [block['email'] for block in blocked_senders]
if email_address in blocked_emails:
results = service.users().history().list(userId='me', startHistoryId=history_id, historyTypes=['messageAdded']).execute()
if 'history' in results:
for history in results['history']:
if 'messagesAdded' in history:
for message in history['messagesAdded']:
apply_label(service, message['message']['id'], 'Blocked - PitchSlap')
remove_label(service, message['message']['id'], 'INBOX')
return
last_history_id = user.get('historyId', '1')
last_processed_message_id = user.get('lastProcessedMessageId', '0')
last_processed_timestamp = user.get('lastProcessedTimestamp', 0)
if int(history_id) <= int(last_history_id):
print(f"History ID {history_id} has already been processed.")
return
results = service.users().history().list(userId='me', startHistoryId=last_history_id, historyTypes=['messageAdded']).execute()
if 'history' in results:
for history in results['history']:
if 'messagesAdded' in history:
for message_added in history['messagesAdded']:
message_id = message_added['message']['id']
msg = service.users().messages().get(userId='me', id=message_id).execute()
if 'INBOX' not in msg.get('labelIds', []):
print(f"Message {message_id} not in INBOX.")
continue
msg_internal_date = int(msg.get('internalDate', 0)) / 1000
if msg_internal_date <= last_processed_timestamp:
print(f"Message {message_id} with timestamp {msg_internal_date} has already been processed.")
continue
headers = msg.get('payload', {}).get('headers', [])
sender = next((h['value'] for h in headers if h['name'] == 'From'), 'Unknown')
receiver = next((h['value'] for h in headers if h['name'] == 'To'), 'Unknown')
subject = next((h['value'] for h in headers if h['name'] == 'Subject'), 'No Subject')
snippet = msg.get('snippet', '')
sender_email = extract_email_address(sender)
email_content = f"Subject: {subject}\n\n{snippet}"
ai_response = check_sales_email_with_openai(email_content)
is_sales_email = ai_response["Sales Email"]
confidence = ai_response["Confidence"]
if is_sales_email and confidence >= 70:
apply_label(service, message_id, 'PitchSlap')
remove_label(service, message_id, 'INBOX')
pitch = save_sales_pitch(
service, message_id, sender_email, receiver, subject, snippet, email_address, user['_id'],
ai_response
)
send_template_email(service, sender_email, 'form_request', pitch)
if not pitch.get('submitted_at'):
apply_label(service, message_id, 'Quarantined - PitchSlap')
print(f"History ID: {history_id}")
print(f"Email Address: {email_address}")
print(f"Sender: {sender_email}")
print(f"Receiver: {receiver}")
print(f"Subject: {subject}")
print(f"Email Content: {snippet}")
print(f"Sales email detected: {is_sales_email}")
mongo.db.users.update_one(
{'email': email_address},
{'$set': {'lastProcessedMessageId': message_id, 'lastProcessedTimestamp': msg_internal_date}}
)
except googleapiclient.errors.HttpError as error:
if error.resp.status == 404:
print(f"Message {message_id} not found.")
continue
else:
raise
# Update the history ID in the database after processing all messages
mongo.db.users.update_one(
{'email': email_address},
{'$set': {'historyId': history_id}}
)
# Update the last processed history ID and timestamp
last_processed_history_id[email_address] = (history_id, current_time)
except google.auth.exceptions.RefreshError:
print("Failed to refresh token, please reauthenticate.")
return redirect(url_for('auth.reauthenticate'))
**User DB Entry: **
{
"_id": {"$oid": "6xxxxxa"},
"google_id": "1xxxxx7",
"email": "pixxxxp1@gmail.com",
"credentials": {
"token": "yxxxxxx69",
"refresh_token": "1xxxxxxxM",
"token_uri": "htxxxxen",
"client_id": "22xxxxxcom",
"client_secret": "GOxxxxxc",
"scopes": [
"https://www.googleapis.com/auth/userinfo.profile",
"https://www.googleapis.com/auth/userinfo.email",
"openid",
"https://www.googleapis.com/auth/gmail.readonly",
"https://www.googleapis.com/auth/gmail.modify"
]
},
"historyId": {"$numberInt": "140436"},
"watch_state": true,
"last_processed_timestamp": {"$numberDouble": "1721765846.9436474"},
"lastProcessedMessageId": "190e1507ff1729a4"
}
Console.
I restarted the program. It then went through and reprocessed everything. Properly processed the first email. I then tested it and moved it into the inbox and it then started processing it in a loop.
Starting to process history for user pitchslap1@gmail.com from history ID 139567 to 139707
Starting to process history for user pitchslap1@gmail.com from history ID 139567 to 139671
Message 190e13de8a820465 not found.
Message 190e13de810426fb not found.
Message 190e13de8a820465 not found.
Starting to process history for user pitchslap1@gmail.com from history ID 139567 to 139726
Message 190e13df993fbc8b not found.
Message 190e13de810426fb not found.
Message 190e13dfcdc2e0bd not found.
Message 190e13df993fbc8b not found.
Message 190e13dfe60bbb9b not found.
127.0.0.1 - - [23/Jul/2024 16:37:03] "POST /pubsub HTTP/1.1" 200 -
Message 190e13dfcdc2e0bd not found.
Message 190e13dfe60bbb9b not found.
Message 190e13de8a820465 not found.
127.0.0.1 - - [23/Jul/2024 16:37:03] "POST /pubsub HTTP/1.1" 200 -
Message 190e13de810426fb not found.
Starting to process history for user pitchslap1@gmail.com from history ID 139707 to 139808
Starting to process history for user pitchslap1@gmail.com from history ID 139707 to 139755
Message 190e13df993fbc8b not found.
Message 190e13dfcdc2e0bd not found.
Message 190e13dfe60bbb9b not found.
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user pitchslap1@gmail.com from history ID 139726 to 140355
Message 190e13dfe60bbb9b not found.
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
Message 190e13dfe60bbb9b not found.
History ID 139587 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
History ID 139638 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
History ID 139605 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
History ID 139650 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user pitchslap1@gmail.com from history ID 140355 to 140426
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user pitchslap1@gmail.com from history ID 140355 to 140456
History ID: 140426
Email Address: pitchslap1@gmail.com
Sender: pitchslap2@gmail.com
Receiver: pitchslap1@gmail.com
Subject: hey
Date: Tue, 23 Jul 2024 16:37:32 -0400
Email Content: you want to buy this lead list or something
Sales email detected: True
Message 190e1507ff1729a4 not in INBOX.
Starting to process history for user pitchslap1@gmail.com from history ID 140355 to 140436
Message 190e150993252c08 not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:37:53] "POST /pubsub HTTP/1.1" 200 -
127.0.0.1 - - [23/Jul/2024 16:37:53] "POST /pubsub HTTP/1.1" 200 -
Message 190e1507ff1729a4 not in INBOX.
Message 190e150993252c08 not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:37:53] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user pitchslap1@gmail.com from history ID 140436 to 140507
Starting to process history for user pitchslap1@gmail.com from history ID 140436 to 140511
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user pitchslap1@gmail.com from history ID 140436 to 140520
Message 190e150993252c08 not in INBOX.
Message 190e150993252c08 not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:38:36] "POST /pubsub HTTP/1.1" 200 -
History ID: 140507
Email Address: pitchslap1@gmail.com
Sender: pitchslap1@gmail.com
Receiver: pitchslap2@gmail.com
Subject: hey
Date: Tue, 23 Jul 2024 20:37:53 +0000
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/pitch_form?email=pitchslap2@gmail.com
Sales email detected: True
Starting to process history for user pitchslap1@gmail.com from history ID 140511 to 140540
127.0.0.1 - - [23/Jul/2024 16:38:37] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user pitchslap1@gmail.com from history ID 140507 to 140563
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user pitchslap1@gmail.com from history ID 140507 to 140575
Message 190e15145fe96b7c not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:38:41] "POST /pubsub HTTP/1.1" 200 -
Message 190e15145fe96b7c not in INBOX.
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
History ID: 140520
Email Address: pitchslap1@gmail.com
Sender: pitchslap1@gmail.com
Receiver: pitchslap1@gmail.com
Subject: hey
Date: Tue, 23 Jul 2024 20:38:38 +0000
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/pitch_form?email=pitchslap1@gmail.com
Sales email detected: True
127.0.0.1 - - [23/Jul/2024 16:38:42] "POST /pubsub HTTP/1.1" 200 -
History ID: 140540
Email Address: pitchslap1@gmail.com
Sender: pitchslap1@gmail.com
Receiver: pitchslap1@gmail.com
Subject: hey
Date: Tue, 23 Jul 2024 20:38:38 +0000
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/pitch_form?email=pitchslap1@gmail.com
Sales email detected: True
Starting to process history for user pitchslap1@gmail.com from history ID 140520 to 140595
Starting to process history for user pitchslap1@gmail.com from history ID 140520 to 140629
127.0.0.1 - - [23/Jul/2024 16:38:42] "POST /pubsub HTTP/1.1" 200 -
Message 190e15145fe96b7c has already been processed.
Message 190e15145fe96b7c has already been processed.
Starting to process history for user pitchslap1@gmail.com from history ID 140540 to 140646
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user pitchslap1@gmail.com from history ID 140540 to 140658
Message 190e151573f8d86e not in INBOX.
Message 190e151573f8d86e not in INBOX.
History ID: 140575
Email Address: pitchslap1@gmail.com
Sender: pitchslap1@gmail.com
Receiver: pitchslap1@gmail.com
Subject: hey
Date: Tue, 23 Jul 2024 13:38:42 -0700
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/pitch_form?email=pitchslap1@gmail.com
Sales email detected: True
Starting to process history for user pitchslap1@gmail.com from history ID 140540 to 140677
127.0.0.1 - - [23/Jul/2024 16:38:46] "POST /pubsub HTTP/1.1" 200 -
Message 190e151573f8d86e not in INBOX.
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user pitchslap1@gmail.com from history ID 140575 to 140733
Starting to process history for user pitchslap1@gmail.com from history ID 140575 to 140715
History ID: 140595
Email Address: pitchslap1@gmail.com
Sender: pitchslap1@gmail.com
Receiver: pitchslap1@gmail.com
Subject: hey
Date: Tue, 23 Jul 2024 13:38:42 -0700
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/pitch_form?email=pitchslap1@gmail.com
Sales email detected: True
History ID: 140629
Email Address: pitchslap1@gmail.com
Sender: pitchslap1@gmail.com
Receiver: pitchslap1@gmail.com
Subject: hey
Date: Tue, 23 Jul 2024 13:38:42 -0700
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/pitch_form?email=pitchslap1@gmail.com
Sales email detected: True