What is a Change Data Capture event?
A Change Data Capture event, or change event, is a notification that Salesforce sends when a change to a Salesforce record occurs as part of a create, update, delete, or undelete operation. The notification includes all new and changed fields, and header fields that contain information about the change. For example, header fields indicate the type of change that triggered the event and the origin of the change. Change events support all custom objects and a subset of standard objects. For more understanding please visit Salesforce developer guide .https://developer.salesforce.com/blogs/2018/08/what-is-change-data-capture.html Steps Need to follow.- First Create a change data event channel
- Go to Setup search Change data capture
- select the object for which you want create channel to get notification . for now we are selecting account object .
import asyncio
from aiosfstream import SalesforceStreamingClient
from datetime import datetime
#------------------------
import json
import sqlite3
def helper(duplicatedata):
# path to database
json_data = duplicatedata
print('json_data');
print(json_data);
#Aim of this block is to get the list of the columns in the JSON file.
payload = json_data['payload']
ChangeEventHeader = payload['ChangeEventHeader']
changeType = ChangeEventHeader['changeType']
Tablecolums = []
TableDatavalues = []
for colum in payload:
if colum == 'ChangeEventHeader':
break
if colum !='LastModifiedDate':
Tablecolums.append(colum)
for col in Tablecolums:
if col =='ChangeEventHeader':
break
if col !='LastModifiedDate':
salesforceValue = "'" +payload[col] + "'"
TableDatavalues.append(salesforceValue)
# We will use recod ID for update in the table to database
SalesforcerecordID = ChangeEventHeader['recordIds']
Tablecolums.append('Id')
Tableprimary_key = "'" +SalesforcerecordID[0] + "'" #primary key values
TableDatavalues.append(Tableprimary_key)
db=sqlite3.connect('C:\SqliteDatabase/SalesforceDB.db') # path to database
#--------------------------------------------------------
if changeType == "CREATE":
print('in create')
# insert_query = "insert into account ({0}) values ({1})".format(",".join(colums), ",".join(vals))
try:
insert_query = "insert into account ({0}) values ({1})".format(",".join(Tablecolums), ",".join(TableDatavalues))
print('insert query')
print(insert_query)
c = db.cursor()
c.execute(insert_query)
print("insert has started at " + str(datetime.now()))
except Exception as e:
print(e)
elif changeType == "UPDATE":
updatevalues =''
print('in update')
for i in range(len(Tablecolums)):
if Tablecolums[i] != 'LastModifiedDate' and Tablecolums[i] != 'Id':
#colValue = "'" +vals[i] + "'"
colValue = TableDatavalues[i]
updatevalues+=Tablecolums[i]+'='+colValue+','
try:
updatevalues = updatevalues[:-1]
print('final values')
print(updatevalues)
# primary_key="'" + primary_key + "'"
update_query = "update account set {0} where {1} = {2};".format(updatevalues,'Id', Tableprimary_key,'Null')
print('update_query')
print(update_query)
c = db.cursor()
c.execute(update_query)
print("update has started at " + str(datetime.now()))
except Exception as e:
print('update exception')
print(e)
#updatevalues.clear()
db.commit()
c.close()
#--------------------------
async def stream_events():
# connect to Streaming API
print('comment 1')
global duplicatedata
async with SalesforceStreamingClient(
consumer_key="Connected app consumer key",
consumer_secret=" Connected app secret key",
username="salesforce org username",
password="salesforce org password",
sandbox=False) as client:
print('comment 2')
ans = await client.subscribe("/data/AccountChangeEvent")
print(ans)
print("----------------------------")
# listen for incoming messages
async for message in client:
data = message["data"]
duplicatedata = data
helper(duplicatedata)
if __name__ == "__main__":
print('server started......')
loop = asyncio.get_event_loop()
loop.run_until_complete(stream_events())
Hits: 443