python利用socket实现通讯
以前网络安全课应老师要求,且要现实TCP或UDP通讯,实现远程操作,并且消息加密。当时正入门python,就想着用python来实现,这里采用的是TCP。(好吧现在回头看不得不说很……emm)
消息加密方式才用的是经典加密方式
纯属菜鸟操作,还请见谅
经典加密(不是自己写的忘了当时从哪个网站荡来的了-_-|||)
import string
lowercase = string. ascii_lowercase
def substitution ( text, key_table) :
text = text. lower( )
result = ''
for l in text:
i = lowercase. find( l)
if i < 0 :
result += l
else :
result += key_table[ i]
return result
def caesar_cypher_encrypt ( text, shift) :
key_table = lowercase[ shift: ] + lowercase[ : shift]
return substitution( text, key_table)
def caesar_cypher_decrypt ( text, shift) :
return caesar_cypher_encrypt( text, - shift)
def crack_caesar_cypher ( text) :
for i in range ( 26 ) :
key_table = lowercase[ - i: ] + lowercase[ : - i]
print ( substitution( text, key_table) [ : 12 ] , '| shift is ' , i, )
Client端
import re
import socket
import os
import sys
import hashlib
basic_path = os. path. dirname( __file__)
sys. path. append( basic_path)
import classicalencryption
def en_message ( msg) :
msg = classicalencryption. caesar_cypher_encrypt( msg, 15 )
return msg
def de_message ( msg) :
msg = classicalencryption. caesar_cypher_decrypt( msg, 15 )
return msg
def get_info_server ( ) :
"""
输入server-IP
并且正则匹配判断输入是否合法
"""
flag_judge_ip = True
while flag_judge_ip:
server_ip = input ( "请输入设定的server IP:" )
regular_result = re. findall(
"^([1-9]|[1-9]\d|[1]\d\d|[2][0-5][0-5])\."
"([[0-9]|[1-9]\d|[1]\d\d|[2][0-5][0-5]])\."
"([0-9]|[1-9]\d|[1]\d\d|[2][0-5][0-5])\."
"([1-9]|[1-9]\d|[1]\d\d|[2][0-5][0-5])$" , server_ip)
if regular_result:
flag_judge_ip = False
else :
print ( "输入的IP地址不合法,请重新输入!" )
continue
"""
输入server端口号
并且大小判断以及输入类型判断进行判断输入是否合法
"""
flag_judge_port = True
while flag_judge_port:
try :
server_port = int ( input ( "请输入设定的server端口号:" ) )
if 1024 <= server_port <= 65535 :
flag_judge_port = False
else :
print ( "建议范围值为【1024 - 65535】" )
continue
except ValueError:
print ( "请输入数字来设定端口号" )
continue
info_server = ( server_ip, server_port)
return info_server
def create_object_client ( ) :
"""
创建/获取客户端对象
"""
object_client = socket. socket( socket. AF_INET, socket. SOCK_STREAM)
return object_client
def operation_cmd_client ( object_client) :
"""
客户端中的操作页面/方式
"""
while True :
command = input ( "command:" )
if not command:
continue
if command == "exit -cmd" :
command = en_message( command)
object_client. send( command. encode( ) )
break
command = en_message( command)
object_client. send( command. encode( ) )
info_return_from_server = object_client. recv( 1024 )
info_decryption = de_message( info_return_from_server)
print ( str ( info_decryption, encoding= "gbk" ) )
def create_account_dict ( ) :
account = input ( "account:" )
passwd = input ( "password:" )
info_account_dict = str ( { "user" : account, "password" : passwd} )
return info_account_dict
def main ( ) :
"""
开启Client-Server连接
"""
global object_client
flag_connect = False
while not flag_connect:
info_receive = get_info_server( )
object_client = create_object_client( )
try :
object_client. connect( info_receive)
flag_connect = True
except ConnectionRefusedError:
print ( "连接失败,请确认IP/端口..." )
continue
while True :
data = input ( 'Message: ' )
if data == "" :
continue
elif data == "exit" :
break
elif data == "get -cmd" :
data = en_message( data)
object_client. send( data. encode( ) )
authentication_info = create_account_dict( )
authentication_info = en_message( authentication_info)
object_client. send( authentication_info. encode( ) )
result = object_client. recv( 1024 ) . decode( )
result = de_message( result)
result = eval ( de_message( result) )
if result:
data = en_message( data)
object_client. send( data. encode( ) )
operation_cmd_client( object_client)
else :
data = en_message( data)
object_client. send( data. encode( ) )
object_client. close( )
if __name__ == "__main__" :
main( )
Server端
import socket
import sys
import os
import re
import logging
import subprocess
import json
import hashlib
basic_path = os. path. dirname( __file__)
sys. path. append( basic_path)
import classicalencryption
def de_message ( msg) :
msg = classicalencryption. caesar_cypher_decrypt( msg, 15 )
return msg
def en_message ( msg) :
classicalencryption. caesar_cypher_encrypt( msg, 15 )
return msg
def logging_txt ( command) :
"""
利用logging模块记录客户端操作,存储到log文件中
"""
logging. basicConfig(
level= logging. DEBUG,
format = '%(asctime)s - %(message)s' ,
datefmt= '%Y-%m-%d %H:%M:%S' ,
filename= 'log_client_operation.log' ,
filemode= 'a'
)
logging. debug( command)
def get_info_server ( ) :
"""
输入server IP
并且正则匹配判断输入是否合法
"""
flag_judge_ip = True
while flag_judge_ip:
server_ip = input ( "请输入设定的server IP:" )
regular_result = re. findall(
"^([1-9]|[1-9]\d|[1]\d\d|[2][0-5][0-5])\."
"([[0-9]|[1-9]\d|[1]\d\d|[2][0-5][0-5]])\."
"([0-9]|[1-9]\d|[1]\d\d|[2][0-5][0-5])\."
"([1-9]|[1-9]\d|[1]\d\d|[2][0-5][0-5])$" , server_ip)
if regular_result:
flag_judge_ip = False
else :
print ( "输入的IP地址不合法,请重新输入!" )
continue
"""
输入server端口号
并且大小判断以及输入类型判断进行判断输入是否合法
"""
flag_judge_port = True
while flag_judge_port:
try :
server_port = int ( input ( "请输入设定的server端口号:" ) )
if 1024 <= server_port <= 65535 :
flag_judge_port = False
else :
print ( "建议范围值为【1024 - 65535】" )
continue
except ValueError:
print ( "请输入数字来设定端口号" )
continue
info_server = ( server_ip, server_port)
return info_server
def create_object_sever ( info_server_offer) :
"""
创建server端对象
绑定IP端口信息
设置监听(数)
"""
object_server = socket. socket( socket. AF_INET, socket. SOCK_STREAM)
object_server. setsockopt( socket. SOL_SOCKET, socket. SO_REUSEADDR, 1 )
object_server. bind( info_server_offer)
object_server. listen( 3 )
return object_server
def operation_cmd ( object_client) :
"""
使用cmd操作server端电脑
"""
while True :
command = object_client. recv( 1024 ) . decode( )
command = de_message( command)
logging_txt( command)
if command == "exit" :
feedback = en_message( "缺少参数 -cmd" )
object_client. send( feedback. encode( "gbk" ) )
continue
if command == "exit -cmd" :
print ( "Client退出了命令行操作..." )
break
if not command:
continue
object_pipe_cmd = subprocess. Popen( command,
shell= True ,
stderr= subprocess. PIPE,
stdin= subprocess. PIPE,
stdout= subprocess. PIPE)
info_get_from_cmd = object_pipe_cmd. stderr. read( )
if info_get_from_cmd:
info_get_from_cmd = en_message( info_get_from_cmd)
object_client. send( info_get_from_cmd)
else :
info_get_from_cmd = object_pipe_cmd. stdout. read( )
info_get_from_cmd = en_message( info_get_from_cmd)
object_client. send( info_get_from_cmd)
def judge_account ( data) :
with open ( "info_authentication.json" , "r" ) as operation_r:
source = json. dumps( operation_r. read( ) )
data = eval ( data)
action1 = hashlib. md5( )
action2 = hashlib. md5( )
action1. update( data[ "user" ] . encode( "utf8" ) )
action2. update( data[ "password" ] . encode( "utf8" ) )
data[ "user" ] = action1. hexdigest( )
data[ "password" ] = action2. hexdigest( )
if source == data:
return True
else :
return False
def judge_info_get_from_client ( info) :
if info == "get -cmd" :
info_account_get = object_get_from_client. recv( 1024 ) . decode( )
info_account_get = de_message( info_account_get)
result = judge_account( info_account_get)
if result:
result_ch = str ( result)
result_ch = en_message( result_ch)
object_get_from_client. send( result_ch. encode( ) )
print ( "用户进入了命令行操作..." )
operation_cmd( object_get_from_client)
continue
else :
object_get_from_client. send( "Sign in Failed!" . encode( ) )
continue
elif info == "exit" :
object_get_from_client. send( "退出成功!" . encode( ) )
break
elif info == "close -server" :
status_server = False
break
else :
print ( "Client-Message:" , data)
def process_main ( object_server) :
status_server = True
while status_server:
object_get_from_client, address_client = object_server. accept( )
print ( "[+]当前client:%s:%d" % ( address_client[ 0 ] , address_client[ 1 ] ) )
try :
while True :
data = object_get_from_client. recv( 1024 ) . decode( )
print ( "msg-encrypted:" , data)
data = de_message( data)
if data == "get -cmd" :
info_account_get = object_get_from_client. recv( 1024 ) . decode( )
info_account_get = de_message( info_account_get)
result = judge_account( info_account_get)
if result:
result_ch = str ( result)
result_ch = en_message( result_ch)
object_get_from_client. send( result_ch. encode( ) )
print ( "用户进入了命令行操作..." )
operation_cmd( object_get_from_client)
continue
else :
object_get_from_client. send( "Sign in Failed!" . encode( ) )
continue
elif data == "exit" :
object_get_from_client. send( "退出成功!" . encode( ) )
break
elif data == "close -server" :
status_server = False
break
else :
print ( "Client-Message:" , data)
object_get_from_client. close( )
except ConnectionError:
print ( "一个Client断开连接..." )
object_get_from_client. close( )
object_server. close( )
def main ( ) :
"""
启动服务
"""
info_receive = get_info_server( )
object_server = create_object_sever( info_receive)
print ( "等待客户端连接..." )
status_server = True
while status_server:
object_get_from_client, address_client = object_server. accept( )
print ( "[+]当前client:%s:%d" % ( address_client[ 0 ] , address_client[ 1 ] ) )
try :
while True :
data = object_get_from_client. recv( 1024 ) . decode( )
print ( "msg-encrypted:" , data)
data = de_message( data)
if data == "get -cmd" :
info_account_get = object_get_from_client. recv( 1024 ) . decode( )
info_account_get = de_message( info_account_get)
result = judge_account( info_account_get)
if result:
result_ch = str ( result)
result_ch = en_message( result_ch)
object_get_from_client. send( result_ch. encode( ) )
print ( "用户进入了命令行操作..." )
operation_cmd( object_get_from_client)
continue
else :
object_get_from_client. send( "Sign in Failed!" . encode( ) )
continue
elif data == "exit" :
object_get_from_client. send( "退出成功!" . encode( ) )
break
elif data == "close -server" :
status_server = False
break
else :
print ( "Client-Message:" , data)
object_get_from_client. close( )
except ConnectionError:
print ( "一个Client断开连接..." )
object_get_from_client. close( )
object_server. close( )
if __name__ == "__main__" :
main( )
这是管理员登陆账户信息的一个加密存储
import hashlib
"""
利用MD5加密把账户、密码进行加密
通过调用json库文件操作
以字典形式存储到json文件中
"""
account = "root"
passwd = "1973"
handle_account = hashlib. md5( )
handle_passwd = hashlib. md5( )
handle_account. update( account. encode( "utf8" ) )
handle_passwd. update( passwd. encode( "utf8" ) )
info_usr = { "user" : handle_account. hexdigest( ) , "password" : handle_passwd. hexdigest( ) }
with open ( "info_authentication.json" , "w" ) as operation_w:
operation_w. write( json. dumps( info_usr) )
很多地方可能存在漏洞,很久以前写的了,这里只是个大概思路,很多功能并不完善,当时还没学线程也就没采用线程了,如果加入线程会好很多。代码量有点多还写得有点乱,不怎么想去改善了