defetl(cursor):cursor.execute("<query>")forrowincursor.fetchall():yieldrowdefdf_generator(cursor):print('Creating pandas DF using generator...')column_names=['id','merchant','status','transaction_date','amount_usd']df=pd.DataFrame(data=etl(cursor),columns=column_names)print('DF successfully created!\n')returndfdefdf_create_from_batch(cursor,batch_size):print('Creating pandas DF using generator...')colnames=['id','merchant','status','transaction_date','amount_usd']df=pd.DataFrame(columns=colnames)# execute a database query to extract datacursor.execute(query)whileTrue:rows=cursor.fetchmany(batch_size)ifnotrows:break# some ETL on a chunk of data of batch_sizebatch_df=pd.DataFrame(data=rows,columns=colnames)df=pd.concat([df,batch_df],ignore_index=True)print('DF successfully created!\n')returndf
importboto3importjsonfromdatetimeimportdatetimeimportpytzs3=boto3.client('s3')defupload_chunks(chunk_gen,s3_bucket,s3_file_prefix):"""Perform Multi-part upload to AWS S3 datalake"""try:cnt=0logs=[]forchunkinchunk_gen:part=bytes(json.dumps(chunk),encoding='utf8')key=s3_file_prefix+file_key()s3.put_object(Body=part,Bucket=s3_bucket,Key=key)logs.append(f'aws s3 cp s3://{s3_bucket}/{key} ./ ')cnt+=1print(f'upload_chunks: Uploaded {cnt} chunks.')print('\n'.join(str(i)foriinlogs))exceptExceptionase:print(e)deffile_key():"""Get a file suffix, i.e. /data_pipe_1/2023/12/11/09/5234023930"""suffix=datetime.utcnow().replace(tzinfo=pytz.utc).strftime('%Y/%m/%d/%H/%M%S%f')returnf'{suffix}'defdf_create_from_batch(cursor,batch_size):print('Creating pandas DF using generator...')colnames=['id','merchant','status','transaction_date','amount_usd']df=pd.DataFrame(columns=colnames)# execute a database query to extract datacursor.execute(query)whileTrue:rows=cursor.fetchmany(batch_size)ifnotrows:break# some ETL on a chunk of data of batch_sizebatch_df=pd.DataFrame(data=rows,columns=colnames)yieldbatch_dfprint('DF successfully created!\n')returndfs3_upload_scope=df_create_from_batch(cursor,10000)upload_chunks(s3_upload_scope,config.get('s3_bucket'),pipe['name'])
defchunk_gen(itemList,chunks):'''Read data in chunks and yield each chunk'''foriinrange(0,len(itemList),chunks):yielditemList[i:i+chunks]defsb_batch_extract(idList_gen):'''Reads data generator, i.e. list of ids, and works with each batch to extract data from database '''try:step=1whileTrue:ids=next(idList_gen)logging.debug(f'> Step {step} processing ids: {ids})')ids_str=','.join(str(i)foriinids)out=get_sb_data(sql,ids_str)logging.debug(f'> Step {step} ids used to produce : {out}')step+=1yieldoutexceptExceptionase:print(e)exceptStopIteration:passfinally:delidList_gen# Step 1: Generate user id list as generator object.idList_gen=chunk_gen([col[0]forcolinget_ids()],250)# Step 2: Extract in chunks from database:extract=sb_batch_extract(idList_gen)actual=[iforiinbatch_extract_demo(idList_gen)]