Question

How to Process Only Truly New Gmail Messages Using Gmail API and Pub/Sub Notifications?

I'm developing an application using the Gmail API and Google Pub/Sub to process incoming emails. My goal is to only process truly new messages and avoid reprocessing messages that have been moved between labels (e.g., moving a message back into the inbox).

Problem: The current implementation still processes messages that are moved back into the inbox, resulting in duplicate processing. How can I ensure that only truly new messages are processed and avoid reprocessing messages that are moved back into the inbox?

Considerations:

  • The Gmail API's historyTypes: ['messageAdded'] seems to include messages that have been moved between labels.
  • I am storing the last processed history ID and timestamp to filter messages.
  • Should I track and store message IDs to avoid reprocessing, or is there a more efficient way to handle this with the Gmail API?

Current Setup:

  • Pub/Sub Notification: I have a Pub/Sub topic that triggers my process_notification function whenever there's a new message added to the inbox.
  • History API: I'm using the Gmail History API to fetch message history based on the history ID.

Main Question:

How can I best process only truly new incoming messages and avoid reprocessing messages if they are moved back into the inbox(or any other label)?

Environment:

  • Language: Python
  • Framework: Flask
  • Database: MongoDB
  • Gmail API

Attempted Solution: I tried to filter messages by their internal timestamp, processing only messages with a timestamp greater than the last processed message. I've tried storing message ID's but this would cause a bloated application. Here's a simplified version of my code:

def start_watch():
    credentials = google.oauth2.credentials.Credentials(**session['credentials'])
    service = build('gmail', 'v1', credentials=credentials)
    
    request = {
        'labelIds': ['INBOX'],
        'topicName': f"projects/{create_app().config['GOOGLE_CLOUD_PROJECT_ID']}/topics/{create_app().config['PUBSUB_TOPIC']}",
        'labelFilterBehavior': 'INCLUDE',
        'historyTypes': ['messageAdded']
    }
    response = service.users().watch(userId='me', body=request).execute()

    mongo.db.users.update_one(
        {'google_id': session['google_id']},
        {'$set': {'historyId': response['historyId'], 'watch_state': True}}
    )

def process_notification(data):
    global last_processed_history_id

    payload = base64.urlsafe_b64decode(data['message']['data'])
    message_data = json.loads(payload)
    
    history_id = message_data['historyId']
    email_address = message_data['emailAddress']
    
    current_time = time.time()

    # Debounce logic
    if email_address in last_processed_history_id:
        last_id, last_time = last_processed_history_id[email_address]
        if history_id == last_id and (current_time - last_time) < debounce_interval:
            print(f"Skipping duplicate notification for history ID {history_id}")
            return

    user = mongo.db.users.find_one({'email': email_address})
    if user:
        credentials = google.oauth2.credentials.Credentials(**user['credentials'])

        try:
            # Refresh credentials if necessary
            if credentials.expired and credentials.refresh_token:
                credentials.refresh(Request())
                mongo.db.users.update_one(
                    {'email': email_address},
                    {'$set': {'credentials': credentials_to_dict(credentials)}}
                )
            elif credentials.expired and not credentials.refresh_token:
                print("No refresh token available. User needs to re-authenticate.")
                return redirect(url_for('auth.reauthenticate'))

            service = build('gmail', 'v1', credentials=credentials)

            # Check if the sender is on the block list for the user
            blocked_senders = mongo.db.block_list.find({'receiver_id': user['_id']})
            blocked_emails = [block['email'] for block in blocked_senders]
            if email_address in blocked_emails:
                results = service.users().history().list(userId='me', startHistoryId=history_id, historyTypes=['messageAdded']).execute()
                if 'history' in results:
                    for history in results['history']:
                        if 'messagesAdded' in history:
                            for message in history['messagesAdded']:
                                apply_label(service, message['message']['id'], 'Blocked - PitchSlap')
                                remove_label(service, message['message']['id'], 'INBOX')
                return

            last_history_id = user.get('historyId', '1')
            last_processed_message_id = user.get('lastProcessedMessageId', '0')
            last_processed_timestamp = user.get('lastProcessedTimestamp', 0)

            if int(history_id) <= int(last_history_id):
                print(f"History ID {history_id} has already been processed.")
                return

            results = service.users().history().list(userId='me', startHistoryId=last_history_id, historyTypes=['messageAdded']).execute()

            if 'history' in results:
                for history in results['history']:
                    if 'messagesAdded' in history:
                        for message_added in history['messagesAdded']:
                            message_id = message_added['message']['id']
                            msg = service.users().messages().get(userId='me', id=message_id).execute()
                            
                            if 'INBOX' not in msg.get('labelIds', []):
                                print(f"Message {message_id} not in INBOX.")
                                continue
                                
                            msg_internal_date = int(msg.get('internalDate', 0)) / 1000
                            
                            if msg_internal_date <= last_processed_timestamp:
                                print(f"Message {message_id} with timestamp {msg_internal_date} has already been processed.")
                                continue

                            headers = msg.get('payload', {}).get('headers', [])
                            sender = next((h['value'] for h in headers if h['name'] == 'From'), 'Unknown')
                            receiver = next((h['value'] for h in headers if h['name'] == 'To'), 'Unknown')
                            subject = next((h['value'] for h in headers if h['name'] == 'Subject'), 'No Subject')
                            snippet = msg.get('snippet', '')

                            sender_email = extract_email_address(sender)

                            email_content = f"Subject: {subject}\n\n{snippet}"
                            ai_response = check_sales_email_with_openai(email_content)
                            is_sales_email = ai_response["Sales Email"]
                            confidence = ai_response["Confidence"]

                            if is_sales_email and confidence >= 70:
                                apply_label(service, message_id, 'PitchSlap')
                                remove_label(service, message_id, 'INBOX')
                                pitch = save_sales_pitch(
                                    service, message_id, sender_email, receiver, subject, snippet, email_address, user['_id'],
                                    ai_response
                                )
                                send_template_email(service, sender_email, 'form_request', pitch)
                                if not pitch.get('submitted_at'):
                                    apply_label(service, message_id, 'Quarantined - PitchSlap')

                            print(f"History ID: {history_id}")
                            print(f"Email Address: {email_address}")
                            print(f"Sender: {sender_email}")
                            print(f"Receiver: {receiver}")
                            print(f"Subject: {subject}")
                            print(f"Email Content: {snippet}")
                            print(f"Sales email detected: {is_sales_email}")

                            mongo.db.users.update_one(
                                {'email': email_address},
                                {'$set': {'lastProcessedMessageId': message_id, 'lastProcessedTimestamp': msg_internal_date}}
                            )

        except googleapiclient.errors.HttpError as error:
            if error.resp.status == 404:
                print(f"Message {message_id} not found.")
                continue
            else:
                raise

            # Update the history ID in the database after processing all messages
            mongo.db.users.update_one(
                {'email': email_address},
                {'$set': {'historyId': history_id}}
            )

            # Update the last processed history ID and timestamp
            last_processed_history_id[email_address] = (history_id, current_time)

        except google.auth.exceptions.RefreshError:
            print("Failed to refresh token, please reauthenticate.")
            return redirect(url_for('auth.reauthenticate'))

**User DB Entry: **

{
  "_id": {"$oid": "6xxxxxa"},
  "google_id": "1xxxxx7",
  "email": "pixxxxp1@gmail.com",
  "credentials": {
    "token": "yxxxxxx69",
    "refresh_token": "1xxxxxxxM",
    "token_uri": "htxxxxen",
    "client_id": "22xxxxxcom",
    "client_secret": "GOxxxxxc",
    "scopes": [
      "https://www.googleapis.com/auth/userinfo.profile",
      "https://www.googleapis.com/auth/userinfo.email",
      "openid",
      "https://www.googleapis.com/auth/gmail.readonly",
      "https://www.googleapis.com/auth/gmail.modify"
    ]
  },
  "historyId": {"$numberInt": "140436"},
  "watch_state": true,
  "last_processed_timestamp": {"$numberDouble": "1721765846.9436474"},
  "lastProcessedMessageId": "190e1507ff1729a4"
}

Console.

I restarted the program. It then went through and reprocessed everything. Properly processed the first email. I then tested it and moved it into the inbox and it then started processing it in a loop.

Starting to process history for user pitchslap1@gmail.com from history ID 139567 to 139707
Starting to process history for user pitchslap1@gmail.com from history ID 139567 to 139671
Message 190e13de8a820465 not found.
Message 190e13de810426fb not found.
Message 190e13de8a820465 not found.
Starting to process history for user pitchslap1@gmail.com from history ID 139567 to 139726
Message 190e13df993fbc8b not found.
Message 190e13de810426fb not found.
Message 190e13dfcdc2e0bd not found.
Message 190e13df993fbc8b not found.
Message 190e13dfe60bbb9b not found.
127.0.0.1 - - [23/Jul/2024 16:37:03] "POST /pubsub HTTP/1.1" 200 -
Message 190e13dfcdc2e0bd not found.
Message 190e13dfe60bbb9b not found.
Message 190e13de8a820465 not found.
127.0.0.1 - - [23/Jul/2024 16:37:03] "POST /pubsub HTTP/1.1" 200 -
Message 190e13de810426fb not found.
Starting to process history for user pitchslap1@gmail.com from history ID 139707 to 139808
Starting to process history for user pitchslap1@gmail.com from history ID 139707 to 139755
Message 190e13df993fbc8b not found.
Message 190e13dfcdc2e0bd not found.
Message 190e13dfe60bbb9b not found.
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user pitchslap1@gmail.com from history ID 139726 to 140355
Message 190e13dfe60bbb9b not found.
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
Message 190e13dfe60bbb9b not found.
History ID 139587 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
History ID 139638 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
History ID 139605 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
History ID 139650 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user pitchslap1@gmail.com from history ID 140355 to 140426
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user pitchslap1@gmail.com from history ID 140355 to 140456
History ID: 140426
Email Address: pitchslap1@gmail.com
Sender: pitchslap2@gmail.com
Receiver: pitchslap1@gmail.com
Subject: hey
Date: Tue, 23 Jul 2024 16:37:32 -0400
Email Content: you want to buy this lead list or something
Sales email detected: True
Message 190e1507ff1729a4 not in INBOX.
Starting to process history for user pitchslap1@gmail.com from history ID 140355 to 140436
Message 190e150993252c08 not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:37:53] "POST /pubsub HTTP/1.1" 200 -
127.0.0.1 - - [23/Jul/2024 16:37:53] "POST /pubsub HTTP/1.1" 200 -
Message 190e1507ff1729a4 not in INBOX.
Message 190e150993252c08 not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:37:53] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user pitchslap1@gmail.com from history ID 140436 to 140507
Starting to process history for user pitchslap1@gmail.com from history ID 140436 to 140511
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user pitchslap1@gmail.com from history ID 140436 to 140520
Message 190e150993252c08 not in INBOX.
Message 190e150993252c08 not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:38:36] "POST /pubsub HTTP/1.1" 200 -
History ID: 140507
Email Address: pitchslap1@gmail.com
Sender: pitchslap1@gmail.com
Receiver: pitchslap2@gmail.com
Subject: hey
Date: Tue, 23 Jul 2024 20:37:53 +0000
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/pitch_form?email=pitchslap2@gmail.com
Sales email detected: True
Starting to process history for user pitchslap1@gmail.com from history ID 140511 to 140540
127.0.0.1 - - [23/Jul/2024 16:38:37] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user pitchslap1@gmail.com from history ID 140507 to 140563
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user pitchslap1@gmail.com from history ID 140507 to 140575
Message 190e15145fe96b7c not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:38:41] "POST /pubsub HTTP/1.1" 200 -
Message 190e15145fe96b7c not in INBOX.
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
History ID: 140520
Email Address: pitchslap1@gmail.com
Sender: pitchslap1@gmail.com
Receiver: pitchslap1@gmail.com
Subject: hey
Date: Tue, 23 Jul 2024 20:38:38 +0000
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/pitch_form?email=pitchslap1@gmail.com
Sales email detected: True
127.0.0.1 - - [23/Jul/2024 16:38:42] "POST /pubsub HTTP/1.1" 200 -
History ID: 140540
Email Address: pitchslap1@gmail.com
Sender: pitchslap1@gmail.com
Receiver: pitchslap1@gmail.com
Subject: hey
Date: Tue, 23 Jul 2024 20:38:38 +0000
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/pitch_form?email=pitchslap1@gmail.com
Sales email detected: True
Starting to process history for user pitchslap1@gmail.com from history ID 140520 to 140595
Starting to process history for user pitchslap1@gmail.com from history ID 140520 to 140629
127.0.0.1 - - [23/Jul/2024 16:38:42] "POST /pubsub HTTP/1.1" 200 -
Message 190e15145fe96b7c has already been processed.
Message 190e15145fe96b7c has already been processed.
Starting to process history for user pitchslap1@gmail.com from history ID 140540 to 140646
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user pitchslap1@gmail.com from history ID 140540 to 140658
Message 190e151573f8d86e not in INBOX.
Message 190e151573f8d86e not in INBOX.
History ID: 140575
Email Address: pitchslap1@gmail.com
Sender: pitchslap1@gmail.com
Receiver: pitchslap1@gmail.com
Subject: hey
Date: Tue, 23 Jul 2024 13:38:42 -0700
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/pitch_form?email=pitchslap1@gmail.com
Sales email detected: True
Starting to process history for user pitchslap1@gmail.com from history ID 140540 to 140677
127.0.0.1 - - [23/Jul/2024 16:38:46] "POST /pubsub HTTP/1.1" 200 -
Message 190e151573f8d86e not in INBOX.
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Template path: C:\Users\toril\pitchslapv5\backend\app\..\templates
Starting to process history for user pitchslap1@gmail.com from history ID 140575 to 140733
Starting to process history for user pitchslap1@gmail.com from history ID 140575 to 140715
History ID: 140595
Email Address: pitchslap1@gmail.com
Sender: pitchslap1@gmail.com
Receiver: pitchslap1@gmail.com
Subject: hey
Date: Tue, 23 Jul 2024 13:38:42 -0700
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/pitch_form?email=pitchslap1@gmail.com
Sales email detected: True
History ID: 140629
Email Address: pitchslap1@gmail.com
Sender: pitchslap1@gmail.com
Receiver: pitchslap1@gmail.com
Subject: hey
Date: Tue, 23 Jul 2024 13:38:42 -0700
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/pitch_form?email=pitchslap1@gmail.com
Sales email detected: True
 2  31  2
1 Jan 1970

Solution

 0

Okay, after a long process. I found the solution.

  1. Included timestamp processing
  2. I needed to ensure that the sender did not equal the user. This was causing an infinite loop if the user moved an email from a label back into the inbox.
2024-07-23
Tori Elstrom