当前位置: 首页 > article >正文

手写SOCKET进行HTTP通信

网络基础

我们电脑主板上都内置了多种网卡,一般主要有以下几类:

  • 虚拟网卡(loopback)
    注意,它是虚拟的,并不是物理网卡,也被称为是本地环回地址(或接口),一般将127.0.0.1作为本地环回地址。

  • 有线网卡/以太网卡(Ethernet)
    这是以太网(局域网)使用的,我们日常说的网卡指的就是这个,插入的就是网线。

  • 无线网卡(WLAN)
    这是无线局域网所使用的网卡,笔记本上常内置此网卡,它用的是无线电技术,不需要像以太网卡那样插网线。

以上这些网卡都会绑定一个本机IP。

127.0.0.1

  • 这是一个回环地址(loopback address),用于指代本地计算机自己。当你向这个地址发送数据时,数据实际上不会离开你的计算机,而是在同一台机器上被处理。这在测试网络服务或进行本地通信时非常有用。

本机IP(192.168.100.123)

当前电脑的IP地址,本机IP中的有线网IP和无线网IP都是需要联网后才能正常分配和访问的,它们是本机对外开放的IP地址。
在局域网中共享打印机或文件,本地网卡所拥有的地址对于别的电脑来说就是唯一的一个访问的IP地址!这个本机IP地址属于整个局域网甚至还能通过路由器NAT上网!同样对于本机来说使用这个IP也是可以访问本地的!

0.0.0.0

  • 这个地址通常用于表示“所有可用的网络接口”。在服务器编程中,如果你的服务端应用程序绑定到0.0.0.0,它将监听所有网络接口上的请求,包括局域网、无线网络以及可能的互联网连接。这使得服务器可以从任何网络接口接收客户端的连接请求。

Flask框架前言

默认情况下,Flask会将’Connection: close’头部设置为响应中,这意味着每个请求后都会关闭连接。这是因为Flask采用了短连接的方式,即每个请求都使用一个新的TCP连接。这样做的好处是可以更好地控制资源,并在每次请求之间隔离状态。 

所以我们在发送请求过程中发过去的数据是keep-alive但是接受到的是clsoe,所以每次发数据都需要重新建立连接发送数据,不能够复用之前的请求。

暂时没有找到如何设置Flask的Connection为keep-alive

请求头参数示例

参考资料

HTTP的两种请求:GET和POST的作用、区别与本质-CSDN博客

代码简单示例

flask_server.py(服务端)

使用flask框架写的服务端

# add_url_rule与装饰器的关系
from flask import Flask, render_template, Response, request,redirect,url_for
import json
# import setting #这是一个配置文件的py文件

app=Flask(__name__,template_folder='templates')
app.config['ENV']='development'
app.config['DEBUG']=True

# app.config.from_object(setting)

@app.route('/',endpoint='s')
def index():
    print("index 展示开始内容")
    return render_template('index.html')
users=[1,2,3,4,5,"test","fadsf"]


@app.route('/register',methods=['get','post'])
def register():
    print("register === method = ", request.method)

    print(request.method)
    if request.method == 'POST':  #注意这里大写。
        username = request.form.get('username')
        passwd = request.form.get('passwd')
        repasswd = request.form.get('repasswd')
        print(passwd,repasswd)
        #密码一致性验证。
        if passwd == repasswd and passwd != '':
            #保存用户
            user = {'用户名':username,"密码":passwd}
            users.append(user)
            #return '注册成功<a href="/">返回首页</a>'
            return redirect('/') #重定向 有两次响应:1 302状态码+location 2.返货localtion请求地址内容

        return '两次密码不一样'
    return render_template('test.html')

@app.route('/show')
def show():
    print("show")
    #users ->str
    return json.dumps(users)
#  app.add_url_rule('/test',view_func=test)
@app.route('/test')
def a():
    url = url_for('s') #路径反向解析
    print(url)
    return url
if __name__ == '__main__':

    # app.run(host="0.0.0.0",port=5000) #指定ip  127.0.0.1通 本地ip192.168.0.103通
    # app.run(host="127.0.0.1",port=5000) #指定ip  127.0.0.1通 本地ip不通
    app.run(host="192.168.0.103",port=5000) #指定ip 本地ip通 127.0.0.1不通

request.py(请求端)

# import pandas as pd
import requests

def main():
    print("main start ===========================================")
    # url = "http://www.baidu.com"
    # url = "127.0.0.1:5000/show"
    url = "http://192.168.0.103:5000/show"

    reqponse = requests.request("GET",url)  #测试发现每次调用request.request都会进行TCP三次握手,如果想要发送多个请求,需要使用requests.session.
    print(reqponse.text)
    print("core = ", reqponse.status_code)
    print("session = ",requests.session())

    # reqponse = requests.request("GET",url)
    # print("core = ", reqponse.status_code)
    # print("session = ",requests.session())
    #
    # reqponse = requests.request("GET",url)
    # print("core = ", reqponse.status_code)
    # print("session = ",requests.session())
    # s = requests.session()
    # s.get(url)
    # s.get(url)


    print("main end ===========================================")

if __name__ == '__main__':
    main()

send_http.py(socket进行请求案例)

import socket

# 目标服务器的地址和端口
host = '127.0.0.1'
port = 5000

# 创建 socket 对象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 连接到服务器
s.connect(("192.168.0.103", port))

# 构建 HTTP GET 请求
http_request = 'GET /show HTTP/1.1\r\nHost: 192.168.0.103:5000\r\nConnection: close\r\n\r\n'
# 发送请求
s.sendall(http_request.encode('utf-8'))

# 接收响应头
response = s.recv(4096)
print(response.decode('utf-8'))

# 接受真正的数据
response = s.recv(4096)
# 打印响应
print(response.decode('utf-8'))

# 关闭连接
s.close()

深入学习

flask_server.py(服务端)

# add_url_rule与装饰器的关系
import time

from flask import Flask, render_template, Response, request,redirect,url_for
import json
# import setting #这是一个配置文件的py文件

app=Flask(__name__,template_folder='templates')
app.config['ENV']='development'
app.config['DEBUG']=True

# app.config.from_object(setting)

@app.route('/',endpoint='s')
def index():
    print("index 展示开始内容")
    return render_template('index.html')

users=[1,2,3,4,5,"test","fadsf"]


@app.route('/register',methods=['get','post'])
def register():
    print("register === method = ", request.method)

    print(request.method)
    if request.method == 'POST':  #注意这里大写。
        username = request.form.get('username')
        passwd = request.form.get('passwd')
        repasswd = request.form.get('repasswd')
        print(passwd,repasswd)
        #密码一致性验证。
        if passwd == repasswd and passwd != '':
            #保存用户
            user = {'用户名':username,"密码":passwd}
            users.append(user)
            #return '注册成功<a href="/">返回首页</a>'
            return redirect('/') #重定向 有两次响应:1 302状态码+location 2.返货localtion请求地址内容

        return '两次密码不一样'
    return render_template('test.html')

@app.route('/show',methods=['get','post'])
def show():
    print("show = ",request.method, "content_type =", request.content_type)
    # 拿到发过来的数据
    if request.method == "GET":

        s = request.args  # 获取get请求参数 ,地址 后面跟 ? 后面的数据 多个字段用 && 进行分割
        print("args = ",s)
        s = request.values  # 获取所有参数
        print("vallues = ",s)
        s = request.args.get("content")
        print("args.get(content = )",s)
    elif request.method == "POST":

        print("============================================================================")

        s = request.form # 获取表单数据
        print("form = ",s)
        s = request.args  # 获取get请求参数 ,地址 后面跟 ? 后面的数据 多个字段用 && 进行分割 ,POST请求不用这个
        print("args = ",s)
        s = request.values  # 获取所有参数:form和args里都有
        print("values = ",s)
        try:
            s = request.json #这种获取方式只适用于JSON格式的数据
            print("json = ",s)
        except Exception as e:
            print("获取JSON格式出现了错误")

        print("============================================================================")
        if(request.content_type == None):
            print("没有内容类型")
        elif(request.content_type.startswith("text/html; charset=utf-8")):
            print("文本")
        elif(request.content_type.startswith("application/x-www-form-urlencoded")):
            s = request.values.get('key')
            print("values.get key = ",s)
        elif(request.content_type.startswith("application/json")  ):
            #获取JSON参数
            s = request.json.get('key')
            print("request.json.get key = ",s)

            #获取JSON原始参数
            s  = request.get_data()
            print("request.get_data = ",s)
        elif request.content_type.startswith('multipart/form-data'):
            s = request.form.get('key')
            print("form.get key = ",s)

        else:
            print("未找到匹配的类型头",request.content_type)

    #users ->str
    # Content-Length: 32\r\n
    return "{'name':'zs','key':{1,2{2}222{}2{}222,332,32,3,4} '}"
    # return json.dumps(users)
#  app.add_url_rule('/test',view_func=test)
@app.route('/test')
def a():
    url = url_for('s') #路径反向解析
    print(url)
    return url
if __name__ == '__main__':

    app.run(host="127.0.0.1",port=5000) #指定ip  127.0.0.1通 本地ip不通
    # app.run(host="192.168.0.103",port=5000) #指定ip 本地ip通 127.0.0.1不通

request.py(请求端)

# import pandas as pd
import json

import requests

# 目标服务器的地址和端口
host = '127.0.0.1'
port = 5000


def main():
    print("main start ===========================================")

    # url = "https://www.baidu.com"
    url = f"http://{host}:{port}/show"
    # url = " http://192.168.0.103:5000/show?key=value1&key2=value2"
    # GET发送请求的第一种方式:路径后面拼接命令:   http://192.168.0.103:5000/show?key1=value1&key2=value2
    # reqponse = requests.request("GET",url)
    print("get request ================================================================")
    header = {"Connection":"close"} # keep-alive
    reqponse = requests.request("GET",url,params={'key1': 'value1','key2':'value2'},timeout=10,headers=header)
    print("txt = ",reqponse.text)
    print("code = ", reqponse.status_code)
    print("session = ",requests.session())
    print("headers = ", reqponse.headers)  # 访问百度界面 Connection不显示 访问自己的服务 connection 为 clsoe


    print("post request =============================================================")
    # header = {"Content-Type": "application/json"} #可以指定头的内容类型进行发送,但是使用data进行传参,传递头为json会请求失败。。
    # 使用POST 请求时,发送参数为 json ={} 时, 发送的请求头类型为:application/json
    # 发送参数为 data = [] 时,发送的请求头类型为:application/x-www-form-urlencoded
    #用 data 参数提交数据时, request.body 的内容则为 a=1&b=2 的这种形式,
    #用 json 参数提交数据时, request.body 的内容则为’{“a”: 1, “b”: 2}’ 的这种形式

    paylod = {'key': 'value', 'Source': 'Python'}

    reqponse = requests.request("POST", url, data = (paylod) ,headers=header ) # ,headers = header
    print("txt = ", reqponse.text)
    print("code = ", reqponse.status_code)
    print("headers = ", reqponse.headers)  # 访问百度界面 Connection为 keep-alive   访问自己的服务


    print("main end ===========================================")


if __name__ == '__main__':
    main()

send_post_http.py(POST发送请求)

import socket
import json
import   select

# 目标服务器的地址和端口
import time

host = '127.0.0.1'
port = 5000
MAX_BUFFER_SIZE = 1024

# post请求第一种用法:先接受头,然后在根据头中的内容大小接受实际数据。
def post1():
    print("===========================================post1()=================================")

    my_data=""

    # 创建 socket 对象
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    s.connect((host, port))


    content = "source=python&key=value"
    # 构建 HTTP POST 请求
    http_request = f'POST /show?get=true HTTP/1.1\r\n' \
                   f'Host: {host}:{port}\r\n' \
                   f'Content-Type: application/x-www-form-urlencoded\r\n' \
                   f'Content-Length:{ len(content) }\r\n' \
                   f'Connection: close\r\n' \
                   f'\r\n'  #格式化字符串输出

    # POST请求在发送过程中必须要先发送请求头在发送请求数据信息。
    #发送请求头
    s.sendall(http_request.encode('utf-8'))
    #发送数据体
    s.sendall(content.encode("utf-8"))

    # 如果发送数据之后使用了sleep,,那么响应的头和内容可能会连到一起发送过来。出现TCP粘包的现象

    time.sleep(1)

    # 接收响应头
    response = s.recv(1024)

    # 找到Content-Length字段拿到真正的数据长度
    hs = response.decode("utf-8")
    print("header = ",hs,"len = ",len(hs))
    # print(f"test sub = {hs[147:]}")

    temp_len = hs.find("Content-Length")
    print("temp_len =",temp_len)
    if temp_len != -1:
        start_pos = temp_len + len("Content-Length")+2
        end_pos = hs.find("\r",start_pos)
        print(f"start_pos = {start_pos},end_pos = {end_pos}")
        content_len_str = hs[start_pos:end_pos]
        print("content_len_str = ",content_len_str)

        method = True
        if method:
            # 接受数据方式一:
            response = s.recv( int(content_len_str) )
            # 打印响应
            my_data = response.decode("utf-8")
            print("接受数据方式一:",my_data)
        else:
            # 接受数据方式二:
            count = 0
            data = b""
            while(count != int(content_len_str)):
                response = s.recv(12)
                data+=response
                count += len(response)
                print(f"单次接受的数据 {response},len = {len(response)}")
                print(f"缓存的数据{data},len = {len(data)}")

                if len(response)==0:
                    print("接受到数据为0")
                    break

                elif len(response)<0:
                    s.close()
                    return "网络断开"

            print("接受数据完成 ",data)
            print("body = ",data.decode('utf-8'))
            my_data = data


    # 关闭连接
    s.close()
    return my_data

# post请求第二种用法:直接用while循环一直接受数据,直到结束
def post2():
    print("===========================================post2()=================================")

    my_data=""

    # 创建 socket 对象
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)


    # 连接到服务器
    s.connect((host, port))
    content = "source=python&key=value"
    # 构建 HTTP POST 请求
    http_request = f'POST /show?get=true HTTP/1.1\r\n' \
                   f'Host: {host}:{port}\r\n' \
                   f'Content-Type: application/x-www-form-urlencoded\r\n' \
                   f'Content-Length:{ len(content) }\r\n' \
                   f'Connection: keep-alive\r\n' \
                   f'\r\n'  #格式化字符串输出

    # POST请求在发送过程中必须要先发送请求头在发送请求数据信息。
    #发送请求头
    s.sendall(http_request.encode('utf-8'))
    #发送数据体
    s.sendall(content.encode("utf-8"))


    # s.setblocking(False)  # 非阻塞模式
    # s.settimeout(0)  #
    # s.setblocking(True)  # 阻塞模式
    # s.settimeout(None)

    result=""

    # 死循环来接受数据
    num_stop = 1
    while num_stop:

        num_stop = 1  # 不加这个数据会结算数据错误
        response = s.recv(5)

        # 找到Content-Length字段拿到真正的数据长度
        recive_data = response.decode("utf-8")
        print("recive_data = ",recive_data,"len = ",len(recive_data))

        result += recive_data  # 将请求拼接在result中
        # print("result = ",result)


        temp_len = result.find("Content-Length")
        print("temp_len =",temp_len)
        if temp_len != -1:
            start_pos = temp_len + len("Content-Length")+2
            end_pos = result.find("\r",start_pos)
            # 找不到结束的位置就跳过继续接受数据
            if end_pos == -1:
                continue

            print(f"start_pos = {start_pos},end_pos = {end_pos}")
            content_len_str = result[start_pos:end_pos]
            print("content_len_str = ",content_len_str)


            # 一般\r\n\r\n是请求的结束标记。
            text_ind = result.find("\r\n\r\n", start_pos) + len("\r\n\r\n")
            if text_ind != -1:
                # 参数不正确
                response_str = result[text_ind:]
                print(f"response_str = {response_str} ")

                end_len = 1

                #从数据的第一个位置开始,少计算一个{ 所以最后带上} 正好num_strop = 0
                for i in range(end_len,len(response_str)):
                    end_len+=1
                    if response_str[i] =="{":
                        num_stop+=1
                    elif response_str[i] == "}":
                        num_stop-=1
                    if(num_stop ==0):
                        if int(content_len_str) != end_len:
                            print("请求头长度和实际内容长度不一致")
                        resp_data = response_str[:end_len]
                        print("resp_data = ",resp_data)
                        my_data = resp_data
                        break
                print("===============num_stop = ",num_stop,"==================")

        if(len(recive_data)< 0):
            s.close()
        elif len(recive_data)==0:
            break


    # 关闭连接
    s.close()
    return my_data


def post3():
    print("===========================================post3()=================================")

    my_data = ""

    # 创建 socket 对象
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    if s.getblocking() == True:
        # 连接到服务器
        s.connect((host, port))
    else:
        # 尝试连接到服务器
        try:
            s.connect((host, port))
        except BlockingIOError:
            # 等待连接完成
            r, _, _ = select.select([s], [], [], 10)  # 等待最多10秒
            if s in r:
                print("连接成功")
            else:
                print("连接超时")

        # 非阻塞模式
    # s.setblocking(False)
    # s.settimeout(0)
    # 阻塞模式
    # s.setblocking(True)
    # s.settimeout(None)

    content = "source=python&key=value"
    # 构建 HTTP POST 请求
    http_request = f'POST /show?get=true HTTP/1.1\r\n' \
                   f'Host: {host}:{port}\r\n' \
                   f'Content-Type: application/x-www-form-urlencoded\r\n' \
                   f'Content-Length:{len(content)}\r\n' \
                   f'Connection: close\r\n' \
                   f'\r\n'  # 格式化字符串输出

    # POST请求在发送过程中必须要先发送请求头在发送请求数据信息。
    # 发送请求头
    s.sendall(http_request.encode('utf-8'))
    # 发送数据体
    s.sendall(content.encode("utf-8"))



    # time.sleep(1)  # 睡眠1秒钟,模拟粘包的现场

    result = ""

    # 死循环来接受数据

    while 1:

        response = s.recv(MAX_BUFFER_SIZE)

        # 找到Content-Length字段拿到真正的数据长度
        recive_data = response.decode("utf-8")
        print("recive_data = ", recive_data, "len = ", len(recive_data))

        result += recive_data  # 将请求拼接在result中
        # print("result = ",result)

        temp_len = result.find("Content-Length")
        print("temp_len =", temp_len)
        if temp_len != -1:
            start_pos = temp_len + len("Content-Length") + 2
            end_pos = result.find("\r", start_pos)
            # 找不到结束的位置就跳过继续接受数据
            if end_pos == -1:
                continue

            print(f"start_pos = {start_pos},end_pos = {end_pos}")
            content_len_str = result[start_pos:end_pos]
            print("content_len_str = ", content_len_str)

            if int(content_len_str)>= MAX_BUFFER_SIZE:
                print("超出最大的content_len_str")
                # return  "ERROR"

            # 一般\r\n\r\n是请求的结束标记。
            text_ind = result.find("\r\n\r\n", start_pos) + len("\r\n\r\n")
            if text_ind != -1:
                # 肯定一直都是成立的条件
                response_str = result[text_ind:]
                print(f"response_str = {response_str} ")

                if len(response_str) >= len(content_len_str):
                    my_data = response_str
                    s.close()
                    return my_data

        if (len(recive_data) < 0):
            s.close()
        elif len(recive_data) == 0:
            break

    # 关闭连接
    s.close()
    return my_data




if __name__ == '__main__':
    # s = post1()  # 方式一可能出现粘包的现场
    s = post2()  # 方式二没有问题
    # s = post3() # 方式三不知道在哪找到的版本有问题
    print("s = ",s)

send_get_http.py(GET发送请求)

import select
import socket

# 目标服务器的地址和端口
import time

host = '127.0.0.1'
port = 5000
MAX_BUFFER_SIZE = 1024

def get1():
    print("===========================================get1()=================================")
    # 创建 socket 对象
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 连接到服务器
    s.connect((host, port))
    # 构建 HTTP GET 请求
    # keep-alive 保持连接   close 关闭连接
    # 构建 HTTP GET 请求 get请求数据一般都在头里,不需要再次发送数据体
    http_request = f'GET /show?aaa=111 HTTP/1.1\r\nHost: {host}:{port}\r\nConnection: keep-alive\r\n\r\n'  #格式化字符串输出
    # 发送请求
    s.sendall(http_request.encode('utf-8'))

    # 接收响应头
    response = s.recv(4096)
    print(response.decode('utf-8'))

    # 接受真正的数据
    response = s.recv(4096)
    # 打印响应
    print(response.decode('utf-8'))

    # 关闭连接
    s.close()


def get2():

    print("===========================================get2()=================================")

    my_data = ""

    # 创建 socket 对象
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # 连接到服务器
    s.connect((host, port))

    # 非阻塞模式
    # s.setblocking(False)
    # s.settimeout(0)
    # 阻塞模式
    # s.setblocking(True)
    # s.settimeout(None)

    content = "source=python&key=value"
    # 构建 HTTP GET 请求 也可以发送数据体,但是一般不建议这样
    http_request = f'GET /show HTTP/1.1\r\n' \
                   f'Host: {host}:{port}\r\n' \
                   f'Content-Type: application/x-www-form-urlencoded\r\n' \
                    f'Content-Length:{len(content)}\r\n' \
                   f'Connection: close\r\n' \
                   f'\r\n'  # 格式化字符串输出

    # GET请求发送数据可以直接拼接在路径头中。
    # 发送请求头
    s.sendall(http_request.encode('utf-8'))
    # 发送数据体
    s.sendall(content.encode("utf-8"))

    # time.sleep(1)  # 睡眠1秒钟,模拟粘包的现场

    result = ""

    # 死循环来接受数据

    while 1:

        response = s.recv(MAX_BUFFER_SIZE)

        # 找到Content-Length字段拿到真正的数据长度
        recive_data = response.decode("utf-8")
        print("recive_data = ", recive_data, "len = ", len(recive_data))

        result += recive_data  # 将请求拼接在result中
        # print("result = ",result)

        temp_len = result.find("Content-Length")
        print("temp_len =", temp_len)
        if temp_len != -1:
            start_pos = temp_len + len("Content-Length") + 2
            end_pos = result.find("\r", start_pos)
            # 找不到结束的位置就跳过继续接受数据
            if end_pos == -1:
                continue

            print(f"start_pos = {start_pos},end_pos = {end_pos}")
            content_len_str = result[start_pos:end_pos]
            print("content_len_str = ", content_len_str)

            if int(content_len_str) >= MAX_BUFFER_SIZE:
                print("超出最大的content_len_str")
                return  "ERROR"

            # 一般\r\n\r\n是请求的结束标记。
            text_ind = result.find("\r\n\r\n", start_pos) + len("\r\n\r\n")
            if text_ind != -1:
                # 肯定一直都是成立的条件
                response_str = result[text_ind:]
                print(f"response_str = {response_str} ")

                if len(response_str) >= len(content_len_str):
                    my_data = response_str
                    s.close()
                    return my_data

        if (len(recive_data) < 0):
            s.close()
        elif len(recive_data) == 0:
            break

    # 关闭连接
    s.close()
    return my_data


def get3():

    print("===========================================get2()=================================")
    """
    my_data = ""

    # 创建 socket 对象
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # 连接到服务器
    s.connect((host, port))
    
    # 非阻塞模式
    # s.setblocking(False)
    # s.settimeout(0)
    # 阻塞模式
    # s.setblocking(True)
    # s.settimeout(None)

    content = "source=python&key=value"
    # 构建 HTTP GET 请求
    http_request = f'GET /show HTTP/1.1\r\n' \
                   f'Host: {host}:{port}\r\n' \
                   f'Content-Type: application/x-www-form-urlencoded\r\n' \
                   f'Content-Length:{len(content)}\r\n' \
                   f'Connection: close\r\n' \
                   f'\r\n'  # 格式化字符串输出

    # GET请求发送数据可以直接拼接在路径头中。
    # 发送请求头
    s.sendall(http_request.encode('utf-8'))
    # 发送数据体
    s.sendall(content.encode("utf-8"))

    # time.sleep(1)  # 睡眠1秒钟,模拟粘包的现场

    result = ""

    # 死循环来接受数据

    # 等待 socket 变为可读

    while 1:
        ready = select.select([s], [], [], None)
        if ready[0]:
            response = s.recv(MAX_BUFFER_SIZE)
            print("response = ",response)

    # 关闭连接
    s.close()
    return my_data

    """

    # 假设我们有一个 socket 对象 s
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # 连接到服务器(假设 host 和 port 已经定义)
    s.connect((host, port))


    content = "source=python&key=value"
    # 构建 HTTP GET 请求
    http_request = f'GET /show HTTP/1.1\r\n' \
                   f'Host: {host}:{port}\r\n' \
                   f'Content-Type: application/x-www-form-urlencoded\r\n' \
                   f'Content-Length:{len(content)}\r\n' \
                   f'Connection: close\r\n' \
                   f'\r\n'  # 格式化字符串输出

    # GET请求发送数据可以直接拼接在路径头中。
    # 发送请求头
    s.sendall(http_request.encode('utf-8'))
    # 发送数据体
    s.sendall(content.encode("utf-8"))

    while 1:
        print("Waiting for data...")
        r, w, x = select.select([s], [], [])

        if s in r:
            print("Socket is readable.")
            # 读取数据
            data = s.recv(1024)
            print("Received:", data)
            if not data:
                print("The connection has been closed by the other party.")
                break
    return


if __name__ == '__main__':
    # get1()
    # get2()
    get3()

使用QT的HTTP进行发送请求

#include <QCoreApplication>
#include <QDebug>

#include <QDebug>
#include <QtNetwork>
#include <QNetworkAccessManager>
#include <QNetworkReply>
#include <QNetworkRequest>
#include <QTextCodec>
#include <QFile>
#include <QTextStream>
#include <QObject>

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);

    qDebug()<< "start ============================";

    QUrl url = QUrl("http://www.baidu.com");
    url = QUrl("http://127.0.0.1:5000/show?source=qt");
    QNetworkAccessManager* manage = new QNetworkAccessManager;
    QObject::connect(manage, &QNetworkAccessManager::finished,[&](QNetworkReply *reply){
        qDebug()<<"manage finisned"<<reply;
    });

    QNetworkRequest req;
    req.setUrl(url);
    req.setHeader(QNetworkRequest::ContentTypeHeader,QVariant("application/x-www-form-urlencoded"));

//    QNetworkReply*reply =  manage->get(req);
    QNetworkReply*reply =manage->post(req,"name=zs&method=qtpost");
    qDebug()<<" reply = "<<reply;
    qDebug()<<"readall = "<<reply->readAll();

    QEventLoop loop;
    QObject::connect(reply,&QNetworkReply::finished,&loop,&QEventLoop::quit);
    QObject::connect(reply,&QNetworkReply::finished,[&](){
        int statusCode  = reply->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
        qDebug() << "statusCode:" << statusCode;
        qDebug()<<reply->readAll(); //使用http直接发数据的话直接relayAll就可以了
        qDebug()<<"header = "<<reply->header(QNetworkRequest::ContentTypeHeader);

        qDebug()<<reply->rawHeaderPairs(); //可以拿到所有的头部数据

    });


    loop.exec();
    qDebug()<<"end================================";

    return 0;
//    return a.exec();
}


http://www.kler.cn/a/514972.html

相关文章:

  • 【云网】云网络基础概念(华为云)
  • 大模型 | AI驱动的数据分析:利用自然语言实现数据查询到可视化呈现
  • 基于STM32的智能空气质量监测与净化系统设计
  • 如何将办公室固定电话设置呼叫转接(或呼叫转移)到手机 -远程高效办公
  • DeepSeek R1发布综述:开源大语言模型的推理能力新标杆
  • Docker核心命令与Yocto项目的高效应用
  • Sklearn机器学习第十五天|机器学习算法原理
  • rocketmq-product-DefaultMQProducer启动流程
  • Dify:低代码 AI 应用开发平台详解与实战指南
  • 【氮化镓】集成ESD提高GaN HEMT稳定性
  • 在elasticsearch中,document数据的写入流程如何?
  • 【贪心算法】洛谷P1106 - 删数问题
  • oracle好用的decode語法
  • Frida+Objection
  • 科技护航:分布式 IO 模块与大型 PLC,稳筑地铁安全防线
  • jupyter配置说明
  • 【2024年华为OD机试】 (E卷,200分) - 跳格子3 (JavaScriptJava PythonC/C++)
  • Spring注解篇:@GetMapping详解
  • 金融波动率的多模型建模研究:GARCH族与HAR模型的Python实现与对比分析
  • Springboot3 自动装配之核心文件:imports文件