本文介绍: 主要为了检测用户测试平台做了哪些操作,又不想写埋点上报,只能直接改底层了,good luck

结果展示

在这里插入图片描述

前端实现

<template>
  <div>
    <div style="height: 20%; padding-top: 5px; padding-right: 5px; padding-left: 5px">
      <el-card>
        <el-date-picker
          v-model="log_time"
          type="datetimerange"
          range-separator=""
          value-format="yyyy-MM-dd HH:mm:ss"
          :default-time="['00:00:00', '23:59:59']"
          start-placeholder="开始日期"
          end-placeholder="结束日期"
        />
        <el-input v-model="log_api_path" style="width: 400px; padding-right: 10px; padding-left: 10px" placeholder="输入请求地址关键字" />
        <el-button type="primary" @click="log_filter">搜索</el-button>
      </el-card>
    </div>
    <div style="padding: 5px">
      <el-card style="padding: 5px; overflow: auto; height: 800px">
        <div>
          <el-collapse v-model="activeName" accordion>
            <el-collapse-item
              v-for="(item, index) in log_list"
              :key="index"
              :title="item.username + ':' + item.req_method + '------' + item.api_path + ' ------操作时间:' + item.create_time"
            >
              <p>操作人:{{ item.username }}</p>
              <p>IP来源:{{ item.from_ip }}</p>
              <p>请求头:{{ item.req_headers }}</p>
              <p>请求体:{{ item.req_body }}</p>
              <p>结果返回请求头:{{ item.res_headers }}</p>
              <p>请求结果:{{ item.res_body }}</p>
              <p>请求响应码:{{ item.res_status_code }}</p>
            </el-collapse-item>
          </el-collapse>
          <div class="block">
            <el-pagination
              background
              :current-page="log_page.current_page"
              :page-sizes="[10, 30, 50]"
              :page-size="log_page.size_page"
              layout="total, sizes, prev, pager, next, jumper"
              :total="log_total"
              style="float: left; padding: 10px"
              @size-change="log_handleSizeChange"
              @current-change="log_handleCurrentChange"
            />
          </div>
        </div>
      </el-card>
    </div>
  </div>
</template>

<script>
import { get_user_action_log } from "@/api/user_act";
import moment from 'moment';


export default {
  name: "ActionLog",
  data() {
    return {
      log_api_path: "",
      log_time: [],
      log_total: null,
      activeName: "",
      log_list: [],
      log_page: {
        size_page: 10,
        current_page: 1
      }
    }
  },
  created() {
    const today = moment();
    const start = today.startOf('day');
    const end = today.endOf('day');

    this.log_time = [
      start.format('YYYY-MM-DD') + ' 00:00:00',
      end.format('YYYY-MM-DD') + ' 23:59:59',
    ];
    this.get_log_list()
  },
  methods: {
    // 页签-条/页 跳转
    log_handleSizeChange(val) {
      this.log_page.size_page = val
      this.get_log_list()
    },
    // 底部页签跳转
    log_handleCurrentChange(val) {
      this.log_page.current_page = val
      this.get_log_list()
    },
    get_log_list() {
      const req = this.log_page
      req.log_api_path = this.log_api_path
      req.user_id = Number(localStorage.getItem("user_id"))
      req.create_time = this.log_time
      get_user_action_log(req).then(res => {
        if (res.data.code === 200) {
          this.log_list = res.data.content
          this.log_total = res.data.log_total
          this.log_page.size_page = res.data.size_page
          this.log_page.current_page = res.data.current_page
        }
      })
    },
    log_filter() {
      this.get_log_list()
    }
  }
}
</script>

<style scoped>

</style>

后端实现

模型层:

class User_action(models.Model):
    api_path = models.TextField(verbose_name="操作接口", max_length=500, null=True)  # 操作接口
    req_ip = models.TextField(verbose_name="来源ip", max_length=500, null=True)  # 来源ip
    req_method = models.TextField(verbose_name="请求方法", max_length=500, null=True)  # 请求方法
    req_body = models.JSONField(default=dict, verbose_name='请求体', null=True)  # 请求体
    req_headers = models.JSONField(default=dict, verbose_name='请求头', null=True)  # 请求头
    res_status_code = models.TextField(verbose_name="响应码", max_length=500, null=True)  # 响应
    res_headers = models.JSONField(default=dict, verbose_name='响应头', null=True)  # 响应
    res_body = models.JSONField(default=dict, verbose_name='响应结果', null=True)  # 响应结果
    user = models.ForeignKey(Userinfo, on_delete=models.CASCADE)  # 操作
    create_time = models.DateTimeField("创建时间", auto_now=True, null=True)  # 调用时间

逻辑层:

def get_user_action_log(request):
    try:
        data = json.loads(request.body)

        if data["log_api_path"] == "":
            logs = User_action.objects.filter(create_time__range=(
                data["create_time"][0],
                data["create_time"][1])).order_by("-id")
        else:
            logs = User_action.objects.filter(create_time__range=(
                data["create_time"][0],
                data["create_time"][1])).order_by("-id")
        size_page = data["size_page"]
        current_pages = data["current_page"]
        log_total = len(logs)
        result = []
        for i in logs:
            user_dict = {
                "id": i.id,
                "api_path": i.api_path,
                "req_method": i.req_method,
                "req_headers": i.req_headers,
                "req_body": i.req_body,
                "res_headers": i.res_headers,
                "res_body": i.res_body,
                "res_status_code": i.res_status_code,
                "create_time": datetime.datetime.strftime(i.create_time, "%Y-%m-%d %H:%M:%S"),
                "username": i.user.username,
                "from_ip": i.req_ip
            }
            result.append(user_dict)
        p = Paginator(result, size_page)
        page1 = p.page(current_pages)
        current_page = page1.object_list

        return JsonResponse({
            "code": 200,
            "message": "获取用户操作日志成功",
            "content": current_page,
            "log_total": log_total,
            "size_page": size_page,
            "current_page": current_pages
        })
    except Exception as e:
        return JsonResponse({
            "code": 100,
            "message": f"获取用户操作日志失败{str(e)}"
        })

修改django底层代码

# 文件地址django/core/handlers/wsgi.py
# 这是django接口底层接口请求文件

class WSGIHandler(base.BaseHandler):
    request_class = WSGIRequest

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.load_middleware()

    def __call__(self, environ, start_response):
        set_script_prefix(get_script_name(environ))
        signals.request_started.send(sender=self.__class__, environ=environ)
        request = self.request_class(environ)
        response = self.get_response(request)

        response._handler_class = self.__class__

        status = "%d %s" % (response.status_code, response.reason_phrase)
        response_headers = [
            *response.items(),
            *(("Set-Cookie", c.output(header="")) for c in response.cookies.values()),
        ]
        start_response(status, response_headers)
        if getattr(response, "file_to_stream", None) is not None and environ.get(
            "wsgi.file_wrapper"
        ):
            # If `wsgi.file_wrapper` is used the WSGI server does not call
            # .close on the response, but on the file wrapper. Patch it to use
            # response.close instead which takes care of closing all files.
            response.file_to_stream.close = response.close
            response = environ["wsgi.file_wrapper"](
                response.file_to_stream, response.block_size
            )
        
        # 增加以下代码
        req_method = request.method
        if req_method == "GET" or req_method == "get":
            req_body = {}
            res_code = 200
            user_id = 1
            # 上传文件接口接口,直接赋默认值,懒得写
            if getattr(response, "file_to_stream", None) is not None and environ.get("wsgi.file_wrapper"):
                res_body = {}
        elif req_method == "OPTIONS" or req_method == "":
            return response
        else:
            # 上传文件接口接口,直接赋默认值,懒得写
            if getattr(response, "file_to_stream", None) is not None and environ.get("wsgi.file_wrapper"):
                req_body = {}
                res_body = json.loads(response._container[0])
                res_code = response.status_code
                user_id = 1
            else:
                req_body = json.loads(request.body)
                # 不需要入库的接口,直接返回
                if request.path_info == "/api/user/login" or request.path_info == "/api/user/get_user_action_log":
                    return response
                else:
                    user_id = req_body["user_id"]
                res_body = json.loads(response._container[0])
                res_code = response.status_code


        response_dict = {key: value for key, value in response_headers}
        api_path = request.path_info
        req_headers = request.headers.__dict__
        form_ip = request.environ["REMOTE_ADDR"]
        User_action.objects.create(user_id=user_id, req_headers=req_headers, req_method=req_method,
                                   req_body=req_body, req_ip=form_ip, res_headers=response_dict,
                                   res_body=res_body, res_status_code=res_code, api_path=api_path)
        return response

总结

主要为了更加直观的查看用户测试平台做了哪些操作,又不想写埋点上报,只能直接改底层了,good luck!

原文地址:https://blog.csdn.net/ljyljyyeah/article/details/134600077

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_24654.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注