一、get请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from urllib import request


def sendGet(url):
try:
time.sleep(8)
req = request.Request(url)
#req.add_header("Referer", "https://linux.linuxidc.com/index.php?folder=")
# req.add_header("User-Agent", "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4620.400 QQBrowser/9.7.13014.400")
# req.add_header("Cookie", "Hm_lvt_05ff9e4b77acc9197fb1fe12f47d29fb=1523884723,1524314474,1524571744; Hm_lpvt_05ff9e4b77acc9197fb1fe12f47d29fb=1524571744; PHPSESSID=octc5sspfkip6d0mqi0aab4e44")
req.add_header("connection", "Keep-Alive")
# req.add_header("Host", "linux.linuxidc.com")
#req.add_header("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8")
#req.add_header("Accept-Encoding", "gzip, deflate, sdch, br")
res = request.urlopen(req,timeout=5)
out = str(res.read().decode(encoding='utf8'))
return out
except UnicodeDecodeError as ierr:
print("\n\n########################################################################\n" + url)
print("########################################################################\n\n\n")
print("OS error: {0}".format(ierr))
return ''
except OSError as err:
print("\n\n########################################################################\n"+url)
print("########################################################################\n\n\n")
print("OS error: {0}".format(err))
return ''

二、post请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from urllib import request

def sendPost(url, data):
try:
time.sleep(8)
req = request.Request(url)
req.add_header("Referer", "https://www.zhaosemei6.com/web/video-duanpian-131.html")
req.add_header("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:85.0) Gecko/20100101 Firefox/85.0")
req.add_header("Cookie", "ASP.NET_SessionId=0g2id2ojzs2k2xj5dm1khp2s; Hm_lvt_9f18188cf6ca76964151d7b599951471=1613460453; Hm_lpvt_9f18188cf6ca76964151d7b599951471=1613461925; UM_distinctid=177a9bcb51c22-03b4f8c9352da8-4c3f217f-1fa400-177a9bcb51d3cc; CNZZDATA1278139285=1372739094-1613459242-https%253A%252F%252Fwww.zhaosemei6.com%252F%7C1613459242")
req.add_header("connection", "keep-alive")
req.add_header("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8")
req.add_header("X-Requested-With", "XMLHttpRequest")
req.add_header("Host", "www.zhaosemei6.com")
req.add_header("Origin", "https://www.zhaosemei6.com")
req.add_header("Accept", "application/json, text/javascript, */*; q=0.01")
#req.add_header("Accept-Encoding", "gzip, deflate, br")
#req.add_header("Accept-Language", "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2")
res = request.urlopen(req,bytes(data, encoding="utf8"),timeout=5)
out = str(res.read().decode(encoding='utf8'))
return out
except UnicodeDecodeError as ierr:
print("\n\n########################################################################\n" + url)
print("########################################################################\n\n\n")
print("OS error: {0}".format(ierr))
return ''
except OSError as err:
print("\n\n########################################################################\n"+url)
print("########################################################################\n\n\n")
print("OS error: {0}".format(err))
return ''

三、传统下载

1
2
3
4
5
6
7
8
9
from urllib import request
import os

def downFile(url, dirPath ,fileName):
with request.urlopen(url) as web:
if not os.path.exists(dirPath):
os.makedirs(dirPath)
with open(dir + fileName, 'wb') as outfile:
outfile.write(web.read())

四、续传下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import sys
import requests
import os
import hashlib

def download(url, headers, dirPath ,fileName):
if fileName is None:
fileName = hashlib.md5(url.encode(encoding='UTF-8')).hexdigest() + url[url.rindex("."):]

if dirPath is None:
dirPath = os.path.dirname(__file__) + "/downloads/"

if not os.path.exists(dirPath):
os.makedirs(dirPath)

if not dirPath.endswith("/"):
dirPath = dirPath+"/"

filePath = os.path.join(dirPath, fileName)

# if os.path.exists(filePath):
# print("-" * 5 + filePath + " exists" + "-" * 5)
# return

requests.packages.urllib3.disable_warnings()
# 第一次请求是为了得到文件总大小
response = requests.get(url, headers=headers,stream=True, verify=False)

total_size = int(response.headers['Content-Length'])

# 这重要了,先看看本地文件下载了多少
if os.path.exists(filePath):
temp_size = os.path.getsize(filePath) # 本地已经下载的文件大小
else:
temp_size = 0

# 显示一下下载了多少
#print("(temp_size:%s,total_size:%s)"%(str(temp_size),str(total_size)))

# 核心部分,这个是请求下载时,从本地文件已经下载过的后面下载
# headers = {'Range': 'bytes=%d-' % temp_size}
headers['Range'] = 'bytes=%d-' % temp_size

# 重新请求网址,加入新的请求头的
r = requests.get(url, stream=True, verify=False, headers=headers)

# 下面写入文件也要注意,看到"ab"了吗?
# "ab"表示追加形式写入文件
with open(filePath, "ab") as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
temp_size += len(chunk)
f.write(chunk)
f.flush()
# 强制将文件描述符为fd的文件写入硬盘
os.fsync(f)
###这是下载实现进度显示####
done = int(50 * temp_size / total_size)
sys.stdout.write("\r[%s%s] %d%%\t(%d/%d : %dk/%dk)\t>>>>\t【%s】" %
('█'*done, ' '*(50-done), 100 * temp_size/total_size, temp_size, total_size, temp_size/1024, total_size/1024, fileName))
sys.stdout.flush()

print() # 避免上面\r 回车符

五、检索指定路径下后缀所有文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import os
import os.path

path= "H:/资料/达内java课程pdf文档/"
ls = []
sufx = "txt"

def getAppointFile(path,ls,sufx):
fileList = os.listdir(path)
try:
for tmp in fileList:
pathTmp = os.path.join(path,tmp)
if True==os.path.isdir(pathTmp):
getAppointFile(pathTmp,ls,sufx)
elif pathTmp[pathTmp.rfind('.')+1:].lower()==sufx.lower():
ls.append(pathTmp)
except PermissionError:
pass

六、文本内容替换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def Replace(file_name, rep_word, new_word):
with open(file_name,encoding='utf8') as f:
content = []
count = 0

for eachline in f:
if rep_word in eachline:
count += eachline.count(rep_word)
eachline = eachline.replace(rep_word, new_word)
content.append(eachline)

decide = input('文件 {0} 中共有{1}个【{2}】\n您确定要把所有的【{3}】替换为【{4}】吗?\n【YES/NO】:'.format\
(file_name, count, rep_word, rep_word, new_word))

if decide in ['YES', 'Yes', 'yes']:
with open(file_name, 'w') as f:
f.writelines(content)
print('Succeed!')
else:
print('Exit!')

七、excel读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import xlrd
import datetime
from datetime import date

def read_excel():
#打开文件
wb = xlrd.open_workbook(r'data.xls')
#获取所有sheet的名字
print(wb.sheet_names())
#获取第二个sheet的表明
sheet2 = wb.sheet_names()[1]
#sheet1索引从0开始,得到sheet1表的句柄
sheet1 = wb.sheet_by_index(0)
rowNum = sheet1.nrows
colNum = sheet1.ncols
#s = sheet1.cell(1,0).value.encode('utf-8')
s = sheet1.cell(1,1).value
#获取某一个位置的数据
# 1 ctype : 0 empty,1 string, 2 number, 3 date, 4 boolean, 5 error
print(sheet1.cell(1,2).ctype)
print(s)
#print(s.decode('utf-8'))
#获取整行和整列的数据
#第二行数据
row2 = sheet1.row_values(1)
#第二列数据
cols2 = sheet1.col_values(2)
#python读取excel中单元格内容为日期的方式
#返回类型有5种
for i in range(rowNum):
if sheet1.cell(i,2).ctype == 3:
d = xlrd.xldate_as_tuple(sheet1.cell_value(i,2),wb.datemode)
print(date(*d[:3]),end='')
print('\n')

八、excel写入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import xlwt
import datetime
from datetime import date

def set_stlye(name,height,bold=False):
#初始化样式
style = xlwt.XFStyle()
#创建字体
font = xlwt.Font()
font.bold = bold
font.colour_index = 4
font.height = height
font.name =name
style.font = font
return style

#写入数据
def write_excel():
f = xlwt.Workbook()
#创建sheet1
sheet1 = f.add_sheet(u'sheet1',cell_overwrite_ok=True)
row0 = [u'业务',u'状态',u'北京',u'上海',u'广州',u'深圳',u'状态小计',u'合计']
column0 = [u'机票',u'船票',u'火车票',u'汽车票',u'其他']
status = [u'预定',u'出票',u'退票',u'业务小计']
for i in range(0,len(row0)):
sheet1.write(0,i,row0[i],set_stlye("Time New Roman",220,True))
i,j = 1,0
while i <4*len(column0): #控制循环:每次加4
#第一列
sheet1.write_merge(i,i+3,0,0,column0[j],set_stlye('Arial',220,True))
#最后一列
sheet1.write_merge(i,i+3,7,7)
i += 4

sheet1.write_merge(21,21,0,1,u'合计',set_stlye("Time New Roman",220,True))

i=0
while i<4*len(column0): #控制外层循环:每次加4
for j in range(0,len(status)): #控制内层循环:设置每一行内容
sheet1.write(i+j+1,1,status[j])
i += 4

#创建sheet2
sheet2 = f.add_sheet(u'sheet2',cell_overwrite_ok=True)
row0 = [u'姓名',u'年龄',u'出生日期',u'爱好',u'关系']
column0 = [u'UZI',u'Faker',u'大司马',u'PDD',u'冯提莫']

#生成第一行
for i in range(0,len(row0)):
sheet2.write(0,i,row0[i],set_stlye('Times New Roman',220,True))

#生成第一列
for i in range(0,len(column0)):
sheet2.write(i+1,0,column0[i],set_stlye('Times New Roman',220,True))
f.save('data.xls')

九、批量ping

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import time
import struct
import socket
import select
import os


def chesksum(data):
n=len(data)
m=n % 2
sum=0
for i in range(0, n - m ,2):
sum += (data[i]) + ((data[i+1]) << 8)#传入data以每两个字节(十六进制)通过ord转十进制,第一字节在低位,第二个字节在高位
if m:
sum += (data[-1])
#将高于16位与低16位相加
sum = (sum >> 16) + (sum & 0xffff)
sum += (sum >> 16) #如果还有高于16位,将继续与低16位相加
answer = ~sum & 0xffff
# 主机字节序转网络字节序列(参考小端序转大端序)
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer

def request_ping(data_type,data_code,data_checksum,data_ID,data_Sequence,payload_body):
# 把字节打包成二进制数据
imcp_packet = struct.pack('>BBHHH32s',data_type,data_code,data_checksum,data_ID,data_Sequence,payload_body)
icmp_chesksum = chesksum(imcp_packet) #获取校验和
# 把校验和传入,再次打包
imcp_packet = struct.pack('>BBHHH32s',data_type,data_code,icmp_chesksum,data_ID,data_Sequence,payload_body)
return imcp_packet


def raw_socket(dst_addr,imcp_packet):
'''
连接套接字,并将数据发送到套接字
'''
#实例化一个socket对象,ipv4,原套接字,分配协议端口
rawsocket = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket.getprotobyname("icmp"))
#记录当前请求时间
send_request_ping_time = time.time()
#发送数据到网络
rawsocket.sendto(imcp_packet,(dst_addr,80))
#返回数据
return send_request_ping_time,rawsocket,dst_addr


def reply_ping(send_request_ping_time,rawsocket,data_Sequence,timeout = 2):
while True:
#开始时间
started_select = time.time()
#实例化select对象,可读rawsocket,可写为空,可执行为空,超时时间
what_ready = select.select([rawsocket], [], [], timeout)
#等待时间
wait_for_time = (time.time() - started_select)
#没有返回可读的内容,判断超时
if what_ready[0] == []: # Timeout
return -1
#记录接收时间
time_received = time.time()
#设置接收的包的字节为1024
received_packet, addr = rawsocket.recvfrom(1024)
#获取接收包的icmp头
#print(icmpHeader)
icmpHeader = received_packet[20:28]
#反转编码
type, code, checksum, packet_id, sequence = struct.unpack(
">BBHHH", icmpHeader
)

if type == 0 and sequence == data_Sequence:
return time_received - send_request_ping_time

#数据包的超时时间判断
timeout = timeout - wait_for_time
if timeout <= 0:
return -1
def dealtime(dst_addr,sumtime,shorttime,longtime,accept,i,time):
sumtime+=time
print(sumtime)
if i==4:
print("{0}的Ping统计信息:".format(dst_addr))
print("数据包:已发送={0},接收={1},丢失={2}({3}%丢失),\n往返行程的估计时间(以毫秒为单位):\n\t最短={4}ms,最长={5}ms,平均={6}ms".format(i+1,accept,i+1-accept,(i+1-accept)/(i+1)*100,shorttime,longtime,sumtime))

def ping(host):
send, accept, lost = 0, 0, 0
sumtime, shorttime, longtime, avgtime = 0, 1000, 0, 0
#TODO icmp数据包的构建
data_type = 8 # ICMP Echo Request
data_code = 0 # must be zero
data_checksum = 0 # "...with value 0 substituted for this field..."
data_ID = 0 #Identifier
data_Sequence = 1 #Sequence number
payload_body = b'abcdefghijklmnopqrstuvwabcdefghi' #data

# 将主机名转ipv4地址格式,返回以ipv4地址格式的字符串,如果主机名称是ipv4地址,则它将保持不变
dst_addr = socket.gethostbyname(host)

print("正在 Ping {0} [{1}] 具有 32 字节的数据:".format(host,dst_addr))

resinfo = {'res': 'yes', 'sendMum': 0, 'receiveMum': 0, 'losRate': '0', 'shortTime': '0ms', 'longTime': '0ms', 'avgTime': '0ms'}

for i in range(0,2):
send=i+1
#请求ping数据包的二进制转换
icmp_packet = request_ping(data_type,data_code,data_checksum,data_ID,data_Sequence + i,payload_body)
#连接套接字,并将数据发送到套接字
send_request_ping_time,rawsocket,addr = raw_socket(dst_addr,icmp_packet)
#数据包传输时间
times = reply_ping(send_request_ping_time,rawsocket,data_Sequence + i)
if times > 0:
print("来自 {0} 的回复: 字节=32 时间={1}ms".format(addr,int(times*1000)))

accept+=1
return_time=int(times * 1000)
sumtime += return_time
if return_time > longtime:
longtime = return_time
if return_time < shorttime:
shorttime = return_time
time.sleep(0.7)
else:
lost+=1
print("请求超时。")
resinfo['res'] = 'no'

if send == 2:
print("{0}的Ping统计信息:".format(dst_addr))
print("\t数据包:已发送={0},接收={1},丢失={2}({3}%丢失),\n往返行程的估计时间(以毫秒为单位):\n\t最短={4}ms,最长={5}ms,平均={6}ms".format(
i + 1, accept, i + 1 - accept, (i + 1 - accept) / (i + 1) * 100, shorttime, longtime, sumtime/send))

resinfo['sendMum'] = i + 1
resinfo['receiveMum'] = accept
resinfo['losRate'] = (i + 1 - accept) / (i + 1) * 100
resinfo['shortTime'] = shorttime
resinfo['longTime'] = longtime
resinfo['avgTime'] = sumtime / send


return resinfo




if __name__ == "__main__":

r = open("./ipInfo.csv", mode='r')

f = open("./pingResult.csv", mode='w')
f.write("IP,已发送,接收,丢失率,最短耗时,最长耗时,平均耗时,结果,ping测时间")
f.write('\n')
f.flush()
os.fsync(f)

ip = r.readline()
while ip:
resinfo = ping(ip.replace('\n',''))
print(resinfo)
f.write("{0},{1},{2},{3}%,{4}ms,{5}ms,{6}ms,{7},{8}".format(ip.replace('\n',''), resinfo['sendMum'], resinfo['receiveMum'],resinfo['losRate'],
resinfo['shortTime'], resinfo['longTime'],resinfo['avgTime'],resinfo['res'],time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())))
f.write('\n')
f.flush()
os.fsync(f)
ip = r.readline()

f.close()
r.close()

十、文件名批量修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import os

def rename_files(directory, old_str, new_str):
if not os.path.exists(directory):
raise FileNotFoundError(f"目录 {directory} 不存在")
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path):
new_filename = filename.replace(old_str, new_str)
new_file_path = os.path.join(directory, new_filename)
os.rename(file_path, new_file_path)


if __name__ == "__main__":
path = "E:/emp/"
rename_files(path, "-", " - ")