We had approximately 130 customer accounts to extract data through the ADF pipeline to be loaded into the Datawarehouse.
We ran into an issue of failed or successful loading of customer accounts into the storage account in ADLS.
We used the “forEach” activity for all the customer accounts when the activity fails/succeeds. We required the particular account details and the corresponding failure error code/success status to be stored in a CSV file. The CSV file in turn would be the input for datalake.
- We used the set variable activity to store the error for each activity.
- We used the copy activity to source from the previous activity (set variable) and drive it into the required path in the data lake.
- Finally, we got the file with the error report into the data lake, but the client requirement was for the report to be in a tabular format with the scheduled time of pipeline run.
Solution:
We couldn’t add the schedule time using copy activity, that prompted to use the notebook activity for the scheduled time, pipeline run status, customer accounts and error report from a variable for notebook parameters as shown.
/*Code Snippet
#Create empty dataframe
df = pd.DataFrame()
if PipelineRunStatus == “Success”:
accounts = json.loads(CustomerAccount)
account_list = [item[‘CustomerAccount’] for item in accounts]
df[‘ CustomerAccount’] = account_list
else:
df[‘ErrorMessage’] = [ErrorMessage]
df[‘scheduledTime’] = scheduledTime
df[‘PipelineRunStatus’] = PipelineRunStatus
#Saving dataframe to csv file
df.to_csv(path, header=True, mode = ‘w’, index=False)
*/
Finally, in accordance with the requirement, we got the file with the CustomerAccount, scheduled time and status of the pipeline.