test2
from ftplib import FTP
from datetime import datetime
import os
import time
import shutil
class My_ftp():
#set loacl path
local_path = "Z:\\晨報區\\02_每日偵況紀載"
Ch_year=str(datetime.now().year-1911)+"年偵況紀載"
month = str(datetime.now().month)+"月"
day = datetime.now().strftime("%m%d")
file_path=os.path.join(local_path,Ch_year,month,day,"KI_file")
def __init__(self):
self.ftp=FTP()
self.ftp.encoding="utf-8"
#utf-8 for chinese filename
def check_local_paht(self):
'check loacl path'
print(f"loacl:{self.file_path}")
if not os.path.exists(self.file_path):
os.makedirs(self.file_path)
def connect(self,ip="10.177.39.237",port=35353):
"connect ftp"
try:
self.ftp.connect(host=ip,port=port)
self.ftp.getwelcome()
except Exception as e:
print(f"connect ftp error::{e}")
def login_fnc(self):
"user_data"
with open("user.up","r")as file:
data = file.read()
self.ftp.login(user="haiwei_all",passwd=data)
def up_load(self):
"no yet pass / passed "
try:
if not os.path.exists('log'):
os.mkdir("log")
already_pass_file = os.listdir(".\\log")
data_list = os.listdir(self.file_path)
val = 0
for i in data_list:
if i not in already_pass_file and "csv" in i:
val+=1
shutil.copy2(os.path.join(self.file_path,i),f".\\log\\{i}")
print(f"copy {i} to log_dir")
with open(os.path.join(self.file_path,i),"rb")as file:
self.ftp.storbinary(f"STOR {i}",file)
print("file passed")
if not val:
print(f"no new data")
self.ftp.quit() #close connect
except Exception as e:
print(f"file upload error: {e}")
def overweek(self):
"del over 7Days file"
try:
data_list = os.listdir(".\\log")
for i in data_list:
#gets file_time
file_time =os.path.getmtime(os.path.join(".\\log",i))
#data_type is float, use datetime turn type to datetime
file_time = datetime.fromtimestamp(file_time)
now = datetime.now()
file_days = (file_time-now).days
if file_days >6:
os.remove(os.path.join(".\\log",i))
except Exception as e:
print(f"del file error: {e}")

0 個意見:
張貼留言
訂閱 張貼留言 [Atom]
<< 首頁