Tushare 日线数据的获取

通过以下代码你可以获取著名的金融数据平台Tushare的股票日线交易数据,存入数据表

代码的优势在于:

  • 1.可以自定义数据的获取的范围,主要通过Define time period的日期参数可调整实现
  • 2.可以获取自己想要的股票数据,主要通过数据表stock_basic来确定ts_code范围(这里附加的前提就是你从Tushare获取基础信息时要按照自己的需求过滤不需要的数据,例如:不考虑ST、和自己没有交易资格的北交所数据等)
  • 3.Rate limiting和time.sleep进一步增强了再数据获取时的请求限制,并避免了因代码逻辑不严谨导致的数据请求丢失
  • 4.同时代码还具有batch_size的分批请求和tqdm的数据请求进度,以及数据是否已经存在的校验判断Function to check if data already exists
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from tqdm import tqdm
import concurrent.futures
import tushare as ts
from threading import Semaphore, Timer
import time

# Initialize tushare
ts.set_token('your_token')
pro = ts.pro_api()

# Create database connection
engine = create_engine('mysql+pymysql://username:password@localhost:3306/stock')
Session = sessionmaker(bind=engine)
session = Session()

# Get stock list with list_date
query = "SELECT ts_code, list_date FROM stock.stock_basic"
stock_list = pd.read_sql(query, engine)

# Define time period
default_start_date = '20240802'
end_date = '20240802'

# Rate limiting
CALLS = 1500 # 每分钟最大请求次数
RATE_LIMIT = 60 # seconds
BATCH_SIZE = 1000 # 每次请求的股票代码数量

# Create a semaphore to limit the number of API calls
semaphore = Semaphore(CALLS)

# Function to periodically release the semaphore
def release_semaphore():
    for _ in range(CALLS):
        semaphore.release()

# Start timer to release semaphore every minute
Timer(RATE_LIMIT, release_semaphore, []).start()

# Function to check if data already exists
def data_exists(ts_code, trade_date):
    query = f"SELECT 1 FROM daily WHERE ts_code='{ts_code}' AND trade_date='{trade_date}' LIMIT 1"
    result = pd.read_sql(query, engine)
    return not result.empty

# Define retry decorator
def retry():
    def decorator(func):
        def wrapper(*args, **kwargs):
            for _ in range(10):
                try:
                    result = func(*args, **kwargs)
                    return result
                except Exception as e:
                    print(f"Error occurred: {e}. Retrying...")
                    time.sleep(30)
            raise Exception("Maximum retries exceeded. Function failed.")
        return wrapper
    return decorator

# Define function to fetch and insert stock data
@retry()
def fetch_and_insert_stock_data(ts_code, list_date, end_date, engine):
    # Determine start_date based on list_date
    start_date = max(list_date.replace('-', ''), default_start_date)

    # Check if data already exists
    if data_exists(ts_code, end_date):
        return f"Data already exists for {ts_code} on {end_date}"

    # Wait on semaphore to respect rate limiting
    semaphore.acquire()
    try:
        # Fetch data from tushare
        stock_data = pro.daily(ts_code=ts_code, start_date=start_date, end_date=end_date)

        # If data is not empty, insert it into the database
        if not stock_data.empty:
            # Map API fields to database fields
            stock_data.rename(columns={'vol': 'volume'}, inplace=True)
            stock_data.to_sql('daily', engine, if_exists='append', index=False, method='multi')
        return ts_code
    finally:
        # Release the semaphore after the operation
        semaphore.release()

# Initialize progress bar
total_stocks = len(stock_list)
pbar = tqdm(total=total_stocks, desc="Fetching stock data")

def process_batch(batch):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = {
            executor.submit(fetch_and_insert_stock_data, row['ts_code'], row['list_date'], end_date, engine): row['ts_code']
            for index, row in batch.iterrows()
        }
        for future in concurrent.futures.as_completed(futures):
            ts_code = futures[future]
            try:
                data = future.result()
                pbar.update(1)
            except Exception as exc:
                print(f'{ts_code} generated an exception: {exc}')

# Process stocks in batches
batch_size = BATCH_SIZE
for i in range(0, total_stocks, batch_size):
    batch = stock_list.iloc[i:i + batch_size]
    process_batch(batch)
    time.sleep(60) # Wait to respect rate limits

# Close progress bar
pbar.close()

股市交易,学会使用数据进行市场行情研究和是否买入/卖出判断,要比通过有心人捏造并通过媒体报道的消息更具有参考意义。

前提是你要能看得懂“数据的内涵意义”!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注