




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)
文檔簡介
第如何使用Python讀取Hive數(shù)據(jù)庫?port=21051,
database=ur_AI_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
logger:logging.Logger=None
self.host=host
self.port=port
self.database=database
self.auth_mechanism=auth_mechanism
self.user=user
self.password=password
self.logger=logger
self.impala_conn=None
self.conn=None
self.cursor=None
self.engine=None
self.session=None
defcreate_table_code(self,file_name):
創(chuàng)建表類代碼
os.system(fsqlacodegen{self.connection_str}{file_name})
returnself.conn
defget_conn(self):
創(chuàng)建連接或獲取連接
ifself.connisNone:
engine=self.get_engine()
self.conn=engine.connect()
returnself.conn
defget_impala_conn(self):
創(chuàng)建連接或獲取連接
ifself.impala_connisNone:
self.impala_conn=connect(
host=self.host,
port=self.port,
database=self.database,
auth_mechanism=self.auth_mechanism,
user=self.user,
password=self.password
returnself.impala_conn
defget_engine(self):
創(chuàng)建連接或獲取連接
ifself.engineisNone:
self.engine=sqlalchemy.create_engine(impala://,creator=self.get_impala_conn)
returnself.engine
defget_cursor(self):
創(chuàng)建連接或獲取連接
ifself.cursorisNone:
self.cursor=self.conn.cursor()
returnself.cursor
defget_session(self)-sessionmaker:
創(chuàng)建連接或獲取連接
ifself.sessionisNone:
engine=self.get_engine()
Session=sessionmaker(bind=engine)
self.session=Session()
returnself.session
defclose_conn(self):
關(guān)閉連接
ifself.connisnotNone:
self.conn.close()
self.conn=None
self.dispose_engine()
self.close_impala_conn()
defclose_impala_conn(self):
關(guān)閉impala連接
ifself.impala_connisnotNone:
self.impala_conn.close()
self.impala_conn=None
defclose_session(self):
關(guān)閉連接
ifself.sessionisnotNone:
self.session.close()
self.session=None
self.dispose_engine()
defdispose_engine(self):
釋放engine
ifself.engineisnotNone:
#self.engine.dispose(close=False)
self.engine.dispose()
self.engine=None
defclose_cursor(self):
關(guān)閉cursor
ifself.cursorisnotNone:
self.cursor.close()
self.cursor=None
defget_data(self,sql,auto_close=True)-pd.DataFrame:
查詢數(shù)據(jù)
conn=self.get_conn()
data=None
try:
#異常重試3次
foriinrange(3):
try:
data=pd.read_sql(sql,conn)
break
exceptExceptionasex:
ifi==2:
raiseex#往外拋出異常
time.sleep(60)#一分鐘后重試
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
ifauto_close:
self.close_conn()
returndata
classVarsHelper():
def__init__(self,save_dir,auto_save=True):
self.save_dir=save_dir
self.auto_save=auto_save
self.values={}
ifnotos.path.exists(os.path.dirname(self.save_dir)):
os.makedirs(os.path.dirname(self.save_dir))
ifos.path.exists(self.save_dir):
withopen(self.save_dir,rb)asf:
self.values=pickle.load(f)
f.close()
defset_value(self,key,value):
self.values[key]=value
ifself.auto_save:
self.save_file()
defget_value(self,key):
returnself.values[key]
defhas_key(self,key):
returnkeyinself.values.keys()
defsave_file(self):
withopen(self.save_dir,wb)asf:
pickle.dump(self.values,f)
f.close()
classGlobalShareArgs():
args={
debug:False
defget_args():
returnGlobalShareArgs.args
defset_args(args):
GlobalShareArgs.args=args
defset_args_value(key,value):
GlobalShareArgs.args[key]=value
defget_args_value(key,default_value=None):
returnGlobalShareArgs.args.get(key,default_value)
defcontain_key(key):
returnkeyinGlobalShareArgs.args.keys()
defupdate(args):
GlobalShareArgs.args.update(args)
classShareArgs():
args={
labels_dir:./hjx/shop_group/month_w_amt/data/labels,#標(biāo)簽?zāi)夸?/p>
labels_output_dir:./hjx/shop_group/month_w_amt/data/labels_output,#聚類導(dǎo)出標(biāo)簽?zāi)夸?/p>
common_datas_dir:./hjx/data,#共用數(shù)據(jù)目錄。ur_bi_dw的公共
only_predict:False,#只識別,不訓(xùn)練
delete_model:True,#先刪除模型,僅在訓(xùn)練時使用
export_excel:False,#導(dǎo)出excel
classes:12,#聚類數(shù)
batch_size:16,
hidden_size:32,
max_nrof_epochs:100,
learning_rate:0.0005,
loss_type:categorical_crossentropy,
avg_model_num:10,
steps_per_epoch:4.0,#4.0
lr_callback_patience:4,
lr_callback_cooldown:1,
early_stopping_callback_patience:6,
get_data:True,
defget_args():
returnShareArgs.args
defset_args(args):
ShareArgs.args=args
defset_args_value(key,value):
ShareArgs.args[key]=value
defget_args_value(key,default_value=None):
returnShareArgs.args.get(key,default_value)
defcontain_key(key):
returnkeyinShareArgs.args.keys()
defupdate(args):
ShareArgs.args.update(args)
classUrBiGetDatasBase():
#線程鎖列表,同保存路徑共用鎖
lock_dict:Dict[str,threading.Lock]={}
#時間列表,用于判斷是否超時
time_dict:Dict[str,datetime.datetime]={}
#用于記錄是否需要更新超時時間
get_data_timeout_dict:Dict[str,bool]={}
def__init__(
self,
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=None,
logger:logging.Logger=None,
self.save_dir=save_dir
self.logger=logger
self.db_helper=HiveHelper(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
logger=logger
#創(chuàng)建子目錄
ifself.save_dirisnotNoneandnotos.path.exists(self.save_dir):
os.makedirs(self.save_dir)
self.vars_helper=None
ifGlobalShareArgs.get_args_value(debug):
self.vars_helper=VarsHelper(./hjx/data/vars/UrBiGetDatas)
defclose(self):
關(guān)閉連接
self.db_helper.close_conn()
defget_last_time(self,key_name)-bool:
獲取是否超時
#轉(zhuǎn)靜態(tài)路徑,確保唯一性
key_name=os.path.abspath(key_name)
ifself.vars_helperisnotNoneandself.vars_helper.has_key(UrBiGetDatasBase.time_list):
UrBiGetDatasBase.time_dict=self.vars_helper.get_value(UrBiGetDatasBase.time_list)
timeout=12#12小時
ifGlobalShareArgs.get_args_value(debug):
timeout=24#24小時
get_data_timeout=False
ifkey_namenotinUrBiGetDatasBase.time_dict.keys()or(datetime.datetime.today()-UrBiGetDatasBase.time_dict[key_name]).total_seconds()(timeout*60*60):
(超時%d小時,重新查數(shù)據(jù):%s,timeout,key_name)
#UrBiGetDatasBase.time_list[key_name]=datetime.datetime.today()
get_data_timeout=True
else:
(未超時%d小時,跳過查數(shù)據(jù):%s,timeout,key_name)
#ifself.vars_helperisnotNone:
#self.vars_helper.set_value(UrBiGetDatasBase.time_list,UrBiGetDatasBase.time_list)
UrBiGetDatasBase.get_data_timeout_dict[key_name]=get_data_timeout
returnget_data_timeout
defsave_last_time(self,key_name):
更新狀態(tài)超時
#轉(zhuǎn)靜態(tài)路徑,確保唯一性
key_name=os.path.abspath(key_name)
ifUrBiGetDatasBase.get_data_timeout_dict[key_name]:
UrBiGetDatasBase.time_dict[key_name]=datetime.datetime.today()
ifself.vars_helperisnotNone:
UrBiGetDatasBase.time_dict[key_name]=datetime.datetime.today()
self.vars_helper.set_value(UrBiGetDatasBase.time_list,UrBiGetDatasBase.time_dict)
defget_lock(self,key_name)-threading.Lock:
獲取鎖
#轉(zhuǎn)靜態(tài)路徑,確保唯一性
key_name=os.path.abspath(key_name)
ifkey_namenotinUrBiGetDatasBase.lock_dict.keys():
UrBiGetDatasBase.lock_dict[key_name]=threading.Lock()
returnUrBiGetDatasBase.lock_dict[key_name]
defget_data_of_date(
self,
save_dir,
sql,
sort_columns:List[str],
del_index_list=[-1],#刪除最后下標(biāo)
start_date=datetime.datetime(2017,1,1),#開始時間
offset=relativedelta(months=3),#時間間隔
date_format_fun=lambdad:%04d%02d01%(d.year,d.month),#查詢語句中替代時間參數(shù)的格式化
filename_format_fun=lambdad:%04d%02d.csv%(d.year,d.month),#查詢語句中替代時間參數(shù)的格式化
stop_date=20700101,#超過時間則停止
data_format_fun=None,#格式化數(shù)據(jù)
分時間增量讀取數(shù)據(jù)
#創(chuàng)建文件夾
ifnotos.path.exists(save_dir):
os.makedirs(save_dir)
else:
#刪除最后一個文件
file_list=os.listdir(save_dir)
iflen(file_list)0:
file_list.sort()
fordel_indexindel_index_list:
os.remove(os.path.join(save_dir,file_list[del_index]))
print(刪除最后一個文件:,file_list[del_index])
select_index=-1
#start_date=datetime.datetime(2017,1,1)
whileTrue:
end_date=start_date+offset
start_date_str=date_format_fun(start_date)
end_date_str=date_format_fun(end_date)
(date:%s-%s,start_date_str,end_date_str)
file_path=os.path.join(save_dir,filename_format_fun(start_date))
#(file_path:%s,file_path)
ifnotos.path.exists(file_path):
data:pd.DataFrame=self.db_helper.get_data(sql%(start_date_str,end_date_str))
ifdataisNone:
break
(data:%d,len(data))
#(data:%d,data.columns)
iflen(data)0:
select_index+=1
ifdata_format_funisnotNone:
data=data_format_fun(data)
#排序
data=data.sort_values(sort_columns)
data.to_csv(file_path)
elifselect_index!=-1:
break
elifstop_datestart_date_str:
raiseException(讀取數(shù)據(jù)異常,時間超出最大值!)
start_date=end_date
classUrBiGetDatas(UrBiGetDatasBase):
def__init__(
self,
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=./hjx/data/ur_bi_dw_data,
logger:logging.Logger=None
self.save_dir=save_dir
self.logger=logger
super().__init__(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
defget_dim_date(self):
日期數(shù)據(jù)
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_date.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時4小時才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.dim_date
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:dim_date.+cforcincolumns}
data=data.rename(columns=columns)
data=data.sort_values([dim_date.date_key])
data.to_csv(file_path)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_shop(self):
店鋪數(shù)據(jù)
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_shop.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時4小時才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.dim_shop
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:dim_shop.+cforcincolumns}
data=data.rename(columns=columns)
data=data.sort_values([dim_shop.shop_no])
data.to_csv(file_path)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_vip(self):
會員數(shù)據(jù)
sub_dir=os.path.join(self.save_dir,vip_no)
now_lock=self.get_lock(sub_dir)
now_lock.acquire()#加鎖
try:
#設(shè)置超時4小時才重新查數(shù)據(jù)
ifnotself.get_last_time(sub_dir):
return
sql=SELECTdv.*,dd.date_key,dd.date_name2
FROMur_bi_dw.dim_vipasdv
INNERJOINur_bi_dw.dim_dateasdd
ONdv.card_create_date=dd.date_name2
wheredd.date_key=%s
anddd.date_key%s
#data:pd.DataFrame=self.db_helper.get_data(sql)
sort_columns=[dv.vip_no]
#TODO:
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
start_date=datetime.datetime(2017,1,1),#開始時間
offset=relativedelta(years=1)
#更新超時時間
self.save_last_time(sub_dir)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_weather(self):
天氣數(shù)據(jù)
sub_dir=os.path.join(self.save_dir,weather)
now_lock=self.get_lock(sub_dir)
now_lock.acquire()#加鎖
try:
#設(shè)置超時4小時才重新查數(shù)據(jù)
ifnotself.get_last_time(sub_dir):
return
sql=
selectweather.*fromur_bi_ods.ods_base_weather_data_1200asweather
whereweather.date_key=%sandweather.date_key%s
sort_columns=[weather.date_key,weather.areaid]
defdata_format_fun(data):
columns=list(data.columns)
columns={c:weather.+cforcincolumns}
data=data.rename(columns=columns)
returndata
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
del_index_list=[-2,-1],#刪除最后下標(biāo)
data_format_fun=data_format_fun,
#更新超時時間
self.save_last_time(sub_dir)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_weather_city(self):
天氣城市數(shù)據(jù)
file_path=os.path.join(self.save_dir,ur_bi_dw.weather_city.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時4小時才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.dim_weather_cityasweather_city
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:weather_city.+cforcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_goods(self):
貨品數(shù)據(jù)
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_goods.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時4小時才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.dim_goods
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:dim_goods.+cforcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_goods_market_shop_date(self):
店鋪商品生命周期數(shù)據(jù)
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_goods_market_shop_date.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時4小時才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
#sql=SELECT*FROMur_bi_dw.dim_goods_market_shop_dateasgoods_shop_date
sql=
selectshop_no,sku_no,shop_market_date,lifecycle_end_date,lifecycle_days
FROMur_bi_dw.dim_goods_market_shop_date
wherelifecycle_end_dateisnotnull
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(lifecycle_end_date.,)forcincolumns}
data=data.rename(columns=columns)
data=data.sort_values([shop_market_date])
data.to_csv(file_path,index=False)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_goods_market_date(self):
全國商品生命周期數(shù)據(jù)
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_goods_market_date.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時4小時才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=
select*FROMur_bi_dw.dim_goods_market_date
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:dim_goods_market_date.+cforcincolumns}
data=data.rename(columns=columns)
data=data.sort_values([dim_goods_market_date.sku_no])
data.to_csv(file_path,index=False)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_goods_color_dev_sizes(self):
商品開發(fā)碼數(shù)數(shù)據(jù)
file_path=os.path.join(self.save_dir,dim_goods_color_dev_sizes.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時4小時才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
#sql=SELECT*FROMur_bi_dw.dim_goods_market_shop_dateasgoods_shop_date
sql=SELECT*FROMur_bi_dm.dim_goods_color_dev_sizes
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(dim_goods_color_dev_sizes.,)forcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path,index=False)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dwd_daily_sales_size(self):
實際銷售金額
sub_dir=os.path.join(self.save_dir,dwd_daily_sales_size_all)
now_lock=self.get_lock(sub_dir)
now_lock.acquire()#加鎖
try:
#設(shè)置超時4小時才重新查數(shù)據(jù)
ifnotself.get_last_time(sub_dir):
return
sql=
selectshop_no,sku_no,date_key,`size`,
sum(tag_price)as`tag_price`,
sum(sales_qty)as`sales_qty`,
sum(sales_tag_amt)as`sales_tag_amt`,
sum(sales_amt)as`sales_amt`,
count(0)as`sales_count`
fromur_bi_dw.dwd_daily_sales_sizeassales
wheresales.date_key=%sandsales.date_key%s
andsales.currency_code=CNY
groupbyshop_no,sku_no,date_key,`size`
sort_columns=[date_key,shop_no,sku_no]
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
start_date=datetime.datetime(2017,1,1),#開始時間
#更新超時時間
self.save_last_time(sub_dir)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dwd_daily_delivery_size(self):
實際配貨金額
sub_dir=os.path.join(self.save_dir,dwd_daily_delivery_size_all)
now_lock=self.get_lock(sub_dir)
now_lock.acquire()#加鎖
try:
#設(shè)置超時4小時才重新查數(shù)據(jù)
ifnotself.get_last_time(sub_dir):
return
sql=
selectshop_no,sku_no,date_key,`size`,
sum(delivery.shop_distr_received_qty)as`shop_distr_received_qty`,
sum(delivery.shop_distr_received_amt)as`shop_distr_received_amt`,
sum(delivery.online_distr_received_qty)as`online_distr_received_qty`,
sum(delivery.online_distr_received_amt)as`online_distr_received_amt`,
sum(delivery.pr_received_qty)as`pr_received_qty`,
count(0)as`delivery_count`
fromur_bi_dw.dwd_daily_delivery_sizeasdelivery
wheredelivery.date_key=%sanddelivery.date_key%s
anddelivery.currency_code=CNY
groupbyshop_no,sku_no,date_key,`size`
sort_columns=[date_key,shop_no,sku_no]
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
start_date=datetime.datetime(2017,1,1),#開始時間
#更新超時時間
self.save_last_time(sub_dir)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_v_last_nation_sales_status(self):
商品暢滯銷數(shù)據(jù)
file_path=os.path.join(self.save_dir,v_last_nation_sales_status.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時4小時才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.v_last_nation_sales_status
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(v_last_nation_sales_status.,)forcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path,index=False)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dwd_daily_finacial_goods(self):
商品成本價數(shù)據(jù)
file_path=os.path.join(self.save_dir,dwd_daily_finacial_goods.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時4小時才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=
selectt1.sku_no,t1.`size`,t1.cost_tax_inclfromur_bi_dw.dwd_daily_finacial_goodsast1
innerjoin(
selectsku_no,`size`,max(date_key)asdate_key
fromur_bi_dw.dwd_daily_finacial_goods
wherecurrency_code=CNYandcountry_code=CN
groupbysku_no,`size`
)ast2
ont2.sku_no=t1.sku_no
andt2.`size`=t1.`size`
andt2.date_key=t1.date_key
wheret1.currency_code=CNYandt1.country_code=CN
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(t1.,)forcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path,index=False)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_size_group(self):
尺碼映射數(shù)據(jù)
file_path=os.path.join(self.save_dir,dim_size_group.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時4小時才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=select*fromur_bi_dw.dim_size_group
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(dim_size_group.,)forcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path,index=False)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_common_datas(
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
logger:logging.Logger=None):
#共用文件
common_datas_dir=ShareArgs.get_args_value(common_datas_dir)
common_ur_bi_dir=os.path.join(common_datas_dir,ur_bi_data)
ur_bi_get_datas=UrBiGetDatas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=common_ur_bi_dir,
logger=logger
try:
(正在查詢?nèi)掌跀?shù)據(jù)...)
ur_bi_get_datas.get_dim_date()
(查詢?nèi)掌跀?shù)據(jù)完成!)
(正在查詢店鋪數(shù)據(jù)...)
ur_bi_get_datas.get_dim_shop()
(查詢店鋪數(shù)據(jù)完成!)
(正在查詢天氣數(shù)據(jù)...)
ur_bi_get_datas.get_weather()
(查詢天氣數(shù)據(jù)完成!)
(正在查詢天氣城市數(shù)據(jù)...)
ur_bi_get_datas.get_weather_city()
(查詢天氣城市數(shù)據(jù)完成!)
(正在查詢貨品數(shù)據(jù)...)
ur_bi_get_datas.get_dim_goods()
(查詢貨品數(shù)據(jù)完成!)
(正在查詢實際銷量數(shù)據(jù)...)
ur_bi_get_datas.get_dwd_daily_sales_size()
(查詢實際銷量數(shù)據(jù)完成!)
exceptExceptionasex:
logger.exception(ex)
raiseex#往外拋出異常
finally:
ur_bi_get_datas.close()
classCustomUrBiGetDatas(UrBiGetDatasBase):
def__init__(
self,
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=./hjx/data/ur_bi_data,
logger:logging.Logger=None
self.save_dir=save_dir
self.logger=logger
super().__init__(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
defget_sales_goal_amt(self):
銷售目標(biāo)金額
file_path=os.path.join(self.save_dir,month_of_year_sales_goal_amt.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時4小時才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=
selectsales_goal.shop_no,
if(sales_goal.serial=Y,W,sales_goal.serial)as`sales_goal.serial`,
dates.month_of_year,
sum(sales_goal.sales_goal_amt)assales_goal_amt
fromur_bi_dw.dwd_sales_goal_westassales_goal
innerjoinur_bi_dw.dim_dateasdates
onsales_goal.date_key=dates.date_key
groupbysales_goal.shop_no,
if(sales_goal.serial=Y,W,sales_goal.serial),
dates.month_of_year
data:pd.DataFrame=self.db_helper.get_data(sql)
data=data.rename(columns={
shop_no:sales_goal.shop_no,
serial:sales_goal.serial,
month_of_year:dates.month_of_year,
#排序
data=data.sort_values([sales_goal.shop_no,sales_goal.serial,dates.month_of_year])
data.to_csv(file_path)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_shop_serial_area(self):
店-系列面積
file_path=os.path.join(self.save_dir,shop_serial_area.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時4小時才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=
selectshop_serial_area.shop_no,
if(shop_serial_area.serial=Y,W,shop_serial_area.serial)as`shop_serial_area.serial`,
shop_serial_area.month_of_year,
sum(shop_serial_area.area)as`shop_serial_area.area`
fromur_bi_dw.dwd_shop_serial_areaasshop_serial_area
whereshop_serial_area.areaisnotnull
groupbyshop_serial_area.shop_no,if(shop_serial_area.serial=Y,W,shop_serial_area.serial),shop_serial_area.month_of_year
data:pd.DataFrame=self.db_helper.get_data(sql)
data=data.rename(columns={
shop_no:shop_serial_area.shop_no,
serial:shop_serial_area.serial,
month_of_year:shop_serial_area.month_of_year,
area:shop_serial_area.area,
#排序
data=data.sort_values([shop_serial_area.shop_no,shop_serial_area.serial,shop_serial_area.month_of_year])
data.to_csv(file_path)
#更新超時時間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_datas(
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=./data/sales_forecast/ur_bi_dw_data,
logger:logging.Logger=None):
ur_bi_get_datas=CustomUrBiGetDatas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
try:
#店,系列,品類,年月,銷售目標(biāo)金額
(正在查詢年月銷售目標(biāo)金額數(shù)據(jù)...)
ur_bi_get_datas.get_sales_goal_amt()
(查詢年月銷售目標(biāo)金額數(shù)據(jù)完成!)
exceptExceptionasex:
logger.exception(ex)
raiseex#往外拋出異常
finally:
ur_bi_get_datas.close()
defgetdata_ur_bi_dw(
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=./data/sales_forecast/ur_bi_dw_data,
logger=None
get_common_datas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
logger=logger
get_datas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
#代碼入口
#getdata_ur_bi_dw(
#host=ur_bi_dw_host,
#port=ur_bi_dw_port,
#database=ur_bi_dw_database,
#auth_mechanism=ur_bi_dw_auth_mechanism,
#user=ur_bi_dw_user,
#password=ur_bi_dw_password,
#save_dir=ur_bi_dw_save_dir,
#logger=logger
#)
代碼說明和領(lǐng)悟
每個類的具體作用說明,代碼
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 郵政快遞設(shè)施建設(shè)工程
- 企業(yè)國防意識培訓(xùn)課件
- 金屬結(jié)構(gòu)廠房設(shè)計與施工一體化合同
- 個性化定制辦公用品買賣合同
- 美國進(jìn)口商定制出口銷售合同范本
- 項目績效目標(biāo)修訂方案
- 車輛抵押貸款還清后借用合同
- 金融科技創(chuàng)新財務(wù)代理與風(fēng)險評估合同范本
- 建筑書架改造方案
- 污水行業(yè)面試題及答案
- GA 1283-2015住宅物業(yè)消防安全管理
- midas分析設(shè)計原理
- 初一英語時態(tài)專題復(fù)習(xí)(附答案)
- 2022年上高縣教師進(jìn)城考試筆試題庫及答案解析
- 質(zhì)量管理手冊(隧道)(中交路橋建設(shè)有限公司)
- 黃大年式教學(xué)團(tuán)隊申報材料
- 出香港貨物發(fā)票樣板樣本空白
- 醫(yī)院免疫室標(biāo)準(zhǔn)化操作程序免疫室內(nèi)質(zhì)量控制操作指南(ELISA)人民醫(yī)院檢驗科免疫SOP人民醫(yī)院質(zhì)量管理體系課件
- 柳州市柳東新區(qū)南慶安置區(qū)項目工程基坑支護(hù)方案
- 卵巢腫瘤ppt課件
- 發(fā)電可靠性考試真題及答案
評論
0/150
提交評論