本篇內容主要講解“如何使用Flask搭建ES搜索引擎”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“如何使用Flask搭建ES搜索引擎”吧!
1 配置文件
Config.py
#coding:utf-8 import os DB_USERNAME = 'root' DB_PASSWORD = None # 如果沒有密碼的話 DB_HOST = '127.0.0.1' DB_PORT = '3306' DB_NAME = 'flask_es' class Config: SECRET_KEY ="隨機字符" # 隨機 SECRET_KEY SQLALCHEMY_COMMIT_ON_TEARDOWN = True # 自動提交 SQLALCHEMY_TRACK_MODIFICATIONS = True # 自動sql DEBUG = True # debug模式 SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://%s:%s@%s:%s/%s' % (DB_USERNAME, DB_PASSWORD,DB_HOST, DB_PORT, DB_NAME) #數據庫URL MAIL_SERVER = 'smtp.qq.com' MAIL_POST = 465 MAIL_USERNAME = '3417947630@qq.com' MAIL_PASSWORD = '郵箱授權碼' FLASK_MAIL_SUBJECT_PREFIX='M_KEPLER' FLASK_MAIL_SENDER=MAIL_USERNAME # 默認發送人 # MAIL_USE_SSL = True MAIL_USE_TLS = False
這是一份相對簡單的 Flask Config 文件,當然對于當前項目來說數據庫的連接不是必要的,我只是用 Mysql 來作為輔助用,小伙伴們沒有必要配置連接數據庫,有 ES 足以。然后郵箱通知這個看個人需求 .....
2 日志
Logger.py
日志模塊在工程應用中是必不可少的一環,根據不同的生產環境來輸出日志文件是非常有必要的。用句江湖上的話來說: "如果沒有日志文件,你死都不知道怎么死的 ....."
# coding=utf-8 import os import logging import logging.config as log_conf import datetime import coloredlogs coloredlogs.DEFAULT_FIELD_STYLES = {'asctime': {'color': 'green'}, 'hostname': {'color': 'magenta'}, 'levelname': {'color': 'magenta', 'bold': False}, 'name': {'color': 'green'}} log_dir = os.path.dirname(os.path.dirname(__file__)) + '/logs' if not os.path.exists(log_dir): os.mkdir(log_dir) today = datetime.datetime.now().strftime("%Y-%m-%d") log_path = os.path.join(log_dir, today + ".log") log_config = { 'version': 1.0, # 格式輸出 'formatters': { 'colored_console': { 'format': "%(asctime)s - %(name)s - %(levelname)s - %(message)s", 'datefmt': '%H:%M:%S' }, 'detail': { 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s', 'datefmt': "%Y-%m-%d %H:%M:%S" #時間格式 }, }, 'handlers': { 'console': { 'class': 'logging.StreamHandler', 'level': 'DEBUG', 'formatter': 'colored_console' }, 'file': { 'class': 'logging.handlers.RotatingFileHandler', 'maxBytes': 1024 * 1024 * 1024, 'backupCount': 1, 'filename': log_path, 'level': 'INFO', 'formatter': 'detail', # 'encoding': 'utf-8', # utf8 編碼 防止出現編碼錯誤 }, }, 'loggers': { 'logger': { 'handlers': ['console'], 'level': 'DEBUG', }, } } log_conf.dictConfig(log_config) log_v = logging.getLogger('log') coloredlogs.install(level='DEBUG', logger=log_v) # # Some examples. # logger.debug("this is a debugging message") # logger.info("this is an informational message") # logger.warning("this is a warning message") # logger.error("this is an error message")
這里準備好了一份我常用的日志配置文件,可作為常用的日志格式,直接調用即可,根據不同的等級來輸出到終端或 .log 文件,拿走不謝。
3 路由
對于 Flask 項目而言, 藍圖和路由會讓整個項目更具觀賞性(當然指的是代碼的閱讀)。
這里我采用兩個分支來作為數據支撐,一個是 Math 入口,另一個是 Baike 入口,數據的來源是基于上一篇的百度百科爬蟲所得,根據 深度優先 的爬取方式抓取后放入 ES 中。
# coding:utf8 from flask import Flask from flask_sqlalchemy import SQLAlchemy from app.config.config import Config from flask_mail import Mail from flask_wtf.csrf import CSRFProtect app = Flask(__name__,template_folder='templates',static_folder='static') app.config.from_object(Config) db = SQLAlchemy(app) db.init_app(app) csrf = CSRFProtect(app) mail = Mail(app) # 不要在生成db之前導入注冊藍圖。 from app.home.baike import baike as baike_blueprint from app.home.math import math as math_blueprint from app.home.home import home as home_blueprint app.register_blueprint(home_blueprint) app.register_blueprint(math_blueprint,url_prefix="/math") app.register_blueprint(baike_blueprint,url_prefix="/baike")
# -*- coding:utf-8 -*- from flask import Blueprint baike = Blueprint("baike", __name__) from app.home.baike import views
# -*- coding:utf-8 -*- from flask import Blueprint math = Blueprint("math", __name__) from app.home.math import views
聲明路由并在 __init__ 文件中初始化
下面來看看路由的實現(以Baike為例)
# -*- coding:utf-8 -*- import os from flask_paginate import Pagination, get_page_parameter from app.Logger.logger import log_v from app.elasticsearchClass import elasticSearch from app.home.forms import SearchForm from app.home.baike import baike from flask import request, jsonify, render_template, redirect baike_es = elasticSearch(index_type="baike_data",index_name="baike") @baike.route("/") def index(): searchForm = SearchForm() return render_template('baike/index.html', searchForm=searchForm) @baike.route("/search", methods=['GET', 'POST']) def baikeSearch(): search_key = request.args.get("b", default=None) if search_key: searchForm = SearchForm() log_v.error("[+] Search Keyword: " + search_key) match_data = baike_es.search(search_key,count=30) # 翻頁 PER_PAGE = 10 page = request.args.get(get_page_parameter(), type=int, default=1) start = (page - 1) * PER_PAGE end = start + PER_PAGE total = 30 print("最大數據總量:", total) pagination = Pagination(page=page, start=start, end=end, total=total) context = { 'match_data': match_data["hits"]["hits"][start:end], 'pagination': pagination, 'uid_link': "/baike/" } return render_template('data.html', q=search_key, searchForm=searchForm, **context) return redirect('home.index') @baike.route('/<uid>') def baikeSd(uid): base_path = os.path.abspath('app/templates/s_d/') old_file = os.listdir(base_path)[0] old_path = os.path.join(base_path, old_file) file_path = os.path.abspath('app/templates/s_d/{}.html'.format(uid)) if not os.path.exists(file_path): log_v.debug("[-] File does not exist, renaming !!!") os.rename(old_path, file_path) match_data = baike_es.id_get_doc(uid=uid) return render_template('s_d/{}.html'.format(uid), match_data=match_data)
可以看到我們成功的將 elasticSearch 類初始化并且進行了數據搜索。
我們使用了 Flask 的分頁插件進行分頁并進行了單頁數量的限制,根據 Uid 來跳轉到詳情頁中。
細心的小伙伴會發現我這里用了個小技巧
@baike.route('/<uid>') def baikeSd(uid): base_path = os.path.abspath('app/templates/s_d/') old_file = os.listdir(base_path)[0] old_path = os.path.join(base_path, old_file) file_path = os.path.abspath('app/templates/s_d/{}.html'.format(uid)) if not os.path.exists(file_path): log_v.debug("[-] File does not exist, renaming !!!") os.rename(old_path, file_path) match_data = baike_es.id_get_doc(uid=uid) return render_template('s_d/{}.html'.format(uid), match_data=match_data)
以此來保證存放詳情頁面的模板中始終只保留一個 html 文件。
4 項目啟動
一如既往的采用 flask_script 作為項目的啟動方案,確實方便。
# coding:utf8 from app import app from flask_script import Manager, Server manage = Manager(app) # 啟動命令 manage.add_command("runserver", Server(use_debugger=True)) if __name__ == "__main__": manage.run()
黑窗口鍵入
python manage.py runserver
就可以啟動項目,默認端口 5000,訪問 http://127.0.0.1:5000
使用gunicorn啟動
gunicorn -c gconfig.py manage:app
#encoding:utf-8 import multiprocessing from gevent import monkey monkey.patch_all() # 并行工作進程數 workers = multiprocessing.cpu_count() * 2 + 1 debug = True reload = True # 自動重新加載 loglevel = 'debug' # 指定每個工作者的線程數 threads = 2 # 轉發為監聽端口8000 bind = '0.0.0.0:5001' # 設置守護進程,將進程交給supervisor管理 daemon = 'false' # 工作模式協程 worker_class = 'gevent' # 設置最大并發量 worker_connections = 2000 # 設置進程文件目錄 pidfile = 'log/gunicorn.pid' logfile = 'log/debug.log' # 設置訪問日志和錯誤信息日志路徑 accesslog = 'log/gunicorn_acess.log' errorlog = 'log/gunicorn_error.log'
項目截圖
到此,相信大家對“如何使用Flask搭建ES搜索引擎”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。