溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python怎么定時從Mysql提取數據存入Redis

發布時間:2020-07-29 09:50:15 來源:億速云 閱讀:362 作者:小豬 欄目:開發技術

這篇文章主要講解了Python怎么定時從Mysql提取數據存入Redis,內容清晰明了,對此有興趣的小伙伴可以學習一下,相信大家閱讀完之后會有幫助。

設計思路:

1.程序一旦run起來,python會把mysql中最近一段時間的數據全部提取出來

2.然后實例化redis類,將數據簡單解析后逐條傳入redis隊列

3.定時器設計每天凌晨12點開始跑

ps:redis是個內存數據庫,做后臺消息隊列的緩存時有很大的用處,有興趣的小伙伴可以去查看相關的文檔。

 # -*- coding:utf-8 -*- 

import MySQLdb
import schedule
import time
import datetime
import random
import string
import redis

# get the data from mysql
class FromSql(object):
  def __init__(self, conn):
    self.conn = conn

  def acquire(self):
    cursor = self.conn.cursor()
    try:
      sql = "SELECT * FROM test WHERE TO_DAYS(NOW()) - TO_DAYS(t) <= 1"

      cursor.execute(sql)
      rs = cursor.fetchall()
      #print (rs)
      for eve in rs:

        print('%s, %s, %s, %s' % eve)
      copy_rs = rs
      cursor.close()

      return copy_rs 

    except Exception as e:
      print("The error: %s" % e)


class RedisQueue(object):

  def __init__(self, name, namespace='queue', **redis_kwargs):
    """The default connection parameters are: host='localhost', port=6379, db=0"""
    self.__db= redis.Redis(**redis_kwargs)
    self.key = '%s:%s' %(namespace, name)

  def qsize(self):
    return self.__db.llen(self.key)

  def put(self, item):
    self.__db.rpush(self.key, item)

  def get(self, block=True, timeout=None):

    if block:
      item = self.__db.blpop(self.key, timeout=timeout)
    else:
      item = self.__db.lpop(self.key)

    if item:
      item = item[1]
    return item

  def get_nowait(self):
    return self.get(False)


if __name__ == "__main__":
  # connect mysqldb
  conn_sql = MySQLdb.connect(
            host = '127.0.0.1',
            port = 3306,
            user = 'root',
            passwd = '',
            db = 'test',
            charset = 'utf8'
            )


def job_for_redis():
    get_data = FromSql(conn_sql)
    data = get_data.acquire()

    q = RedisQueue('test',host='localhost', port=6379, db=0)
    for single_data in data:
      for meta_data in single_data:
        q.put(meta_data)
        print(meta_data)
    print("All data had been inserted.") 

"""
  try:
    schedule.every().day.at("00:00").do(job_for_redis)
  except Exception as e:
    print('Error: %s'% e)
#  finally:
#    conn.close()

  while True:
    schedule.run_pending()
    time.sleep(1)
"""

補充知識:python定時獲取匯率存入數據庫

python定時任務:

我們可以使用 輕量級的第三方模塊schedule。首先先安裝:pip install schedule

定時任務的的小測試:

import schedule
import time
 
def job():
  print("I'm working...")
 
schedule.every(10).minutes.do(job)       # 每隔10分鐘執行一次任務
schedule.every().hour.do(job)          # 每隔一小時執行一次任務
schedule.every().day.at("10:30").do(job)    # 每天10:30執行一次任務
schedule.every(5).to(10).days.do(job)      # 每5-10天執行一次任務
schedule.every().monday.do(job)         # 每周一的這個時候執行一次任務
schedule.every().wednesday.at("13:15").do(job) # 每周三13:15執行一次任務
 
while True:
  schedule.run_pending()

獲取數據存入數據庫:(格式可能不太對,還有一些符號。自己修改一下即可)

import pymysql
import schedule
import time
import requests
import pandas
from sqlalchemy import create_engine

#獲取美元的所有外匯
def job():
  content = '美元'
  url = 'http://www.boc.cn/sourcedb/whpj/index.html' #外匯數據地址
  html = requests.get(url).content.decode('utf-8')

  index = html.index('<td>' + content + '</td>')
  str = html[index:index+300]
  result = re.findall('<td>(.*&#63;)</td>',str)

  print("幣種:" + result[0])
  print("現匯買入價:" + result[1])
  print("現鈔買入價:" + result[2])
  print("現匯賣出價:" + result[3])
  print("現鈔賣出價:" + result[4])
  print("中行結算價:" + result[5])
  print("發布時間:" + result[6] + ' ' + result[7])
  
 #本地地址 數據庫賬號 密碼  數據庫名
  db = pymysql.connect('localhost','root','root','pinyougoudb')
  cursor = db.cursor()
  
 #sql語句
  sql = "update tb_money set huiBuy = %s,chaoBuy = %s,huiSale = %s,chaoSale = %s,centerResult= %s,publishTime = '%s' where typeId = '%s'" % (result[1], result[2], result[3], result[4], result[5], result[6] + ' ' + result[7], result[0])

  cursor.execute(sql)
  db.commit()
  print('success')

 # 查詢語句,將存入的數據查出來
  # sqlalchemy 進行數據庫初始化
  engine = create_engine('mysql+pymysql://root:root@localhost:3306/pinyougoudb')
  sql = '''select * from tb_money'''

  # pandas 進行數據庫讀寫
  df = pandas.read_sql_query(sql,engine)
  print(df)

  db.commit()


# 每隔幾分中刷新一次
#schedule.every(0.1).minutes.do(job)

#每天什么時候刷新
schedule.every().day.at("09:29").do(job)
schedule.every().day.at("09:30").do(job)

#一直循環 知道滿足條件執行
while True:
  schedule.run_pending()

看完上述內容,是不是對Python怎么定時從Mysql提取數據存入Redis有進一步的了解,如果還想學習更多內容,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女