Changedatacaputre
In This post we will learn how to syn up Salesforce object data with third party databases like SQLite . We are going to use Salesforce Change Data Capture to send the notifications to third party app about the record changes in Salesforce. We will use python script to perform curd operation on SQLite Database. Before diving into code Let’s have a quick look on Change Data Capture

What is a Change Data Capture event?

A Change Data Capture event, or change event, is a notification that Salesforce sends when a change to a Salesforce record occurs as part of a create, update, delete, or undelete operation. The notification includes all new and changed fields, and header fields that contain information about the change. For example, header fields indicate the type of change that triggered the event and the origin of the change. Change events support all custom objects and a subset of standard objects. For more understanding please visit Salesforce developer guide .https://developer.salesforce.com/blogs/2018/08/what-is-change-data-capture.html Steps Need to follow.
  1. First Create a change data event channel
    1. Go to Setup search Change data capture
    2. select the object for which you want create channel to get notification . for now we are selecting account object .
2. create a database in your third party application server(SQLite Databse) and create a table with name “Account” in your database ( You can take any name for database table we are taking account here since we are capturing account object changes 3. Run the below Python script . make sure that before executing this script you should have aiosfstream installed.
import asyncio

from aiosfstream import SalesforceStreamingClient
from datetime import datetime
#------------------------
import json
import sqlite3
def helper(duplicatedata):
   
    # path to database
    
    json_data = duplicatedata
    print('json_data');
    print(json_data);
    #Aim of this block is to get the list of the columns in the JSON file.

    payload = json_data['payload']
    ChangeEventHeader = payload['ChangeEventHeader']
    changeType = ChangeEventHeader['changeType']
    Tablecolums = []
    TableDatavalues = []
    for colum in payload:
        if colum == 'ChangeEventHeader':
            break
        if colum !='LastModifiedDate':
            Tablecolums.append(colum)
   
    for col in Tablecolums:
        if col =='ChangeEventHeader':
            break
        if col !='LastModifiedDate':
            salesforceValue = "'" +payload[col] + "'"
            TableDatavalues.append(salesforceValue)
    
    # We will use recod ID for update in the table to database
    SalesforcerecordID = ChangeEventHeader['recordIds']
    Tablecolums.append('Id')
    Tableprimary_key = "'" +SalesforcerecordID[0] + "'" #primary key values
    TableDatavalues.append(Tableprimary_key)

    db=sqlite3.connect('C:\SqliteDatabase/SalesforceDB.db') # path to database

    
#--------------------------------------------------------
    if changeType == "CREATE":
        print('in create')
       # insert_query = "insert into account ({0}) values ({1})".format(",".join(colums), ",".join(vals))
        try:
            insert_query = "insert into account ({0}) values ({1})".format(",".join(Tablecolums), ",".join(TableDatavalues))
            print('insert query')
            print(insert_query)
            c = db.cursor()
            c.execute(insert_query)
            print("insert has started at " + str(datetime.now()))
        except Exception as e:
            print(e)
    
    elif changeType == "UPDATE":
        updatevalues =''
        print('in update')
        for i in range(len(Tablecolums)):
            if Tablecolums[i] != 'LastModifiedDate' and Tablecolums[i] != 'Id':
                #colValue = "'" +vals[i] + "'"
                colValue = TableDatavalues[i] 
                updatevalues+=Tablecolums[i]+'='+colValue+','
        try:
            updatevalues =  updatevalues[:-1]
            print('final values')
            print(updatevalues)
           # primary_key="'" + primary_key + "'"
            update_query = "update account set {0}  where {1} = {2};".format(updatevalues,'Id', Tableprimary_key,'Null')

            print('update_query')
            print(update_query)
            c = db.cursor()
            c.execute(update_query)
           
            print("update has started at " + str(datetime.now()))
        except Exception as e:
            print('update exception')
            print(e)
    
       #updatevalues.clear()
    db.commit()
    c.close()
#--------------------------

async def stream_events():
    # connect to Streaming API
    print('comment 1')
    global duplicatedata
    async with SalesforceStreamingClient(
            consumer_key="Connected app consumer key",
            consumer_secret=" Connected app secret key",
            username="salesforce org username",
            password="salesforce org password",
            sandbox=False) as client:
        print('comment 2')
       
        ans = await client.subscribe("/data/AccountChangeEvent")
        print(ans)
        print("----------------------------")
        # listen for incoming messages
        async for message in client:
            
            data = message["data"]
            duplicatedata = data
            helper(duplicatedata)

if __name__ == "__main__":
    print('server started......')
    loop = asyncio.get_event_loop()
    loop.run_until_complete(stream_events())

Hits: 443

Share Post

By Himanshu Rana

My Name is Himanshu Rana, 23 Years young, born and grow up in Ghaziabad, India. A High Spirited Salesforce Admin, Developer and a Blogger. I currently work at Wakencode Technologies,

Leave a Reply

Your email address will not be published. Required fields are marked *