按照公式实现的两种实现方式R/S和MF-DFA。经测试MD_DFA要比R/S算法的估计准些。
import numpy as np
import pandas as pd
import math
def HurstRS(X,smin=4,step=1):
X = np.array(X)
N=len(X)
s=range(smin,int(np.floor(N/2)),1)
#print s
if not len(s):
print "the sub step is invalid"
return
fs = []
for step in s:
ns = np.floor(N/step)
fns = []
for i in range(int(ns)):
subX = X[step*i:step*(i+1)]
sub_mean = np.mean(subX)
sub_S = np.std(subX)
subY = np.cumsum(subX-sub_mean)
sub_R = np.max(subY)-np.min(subY)
sub_RR = sub_R/sub_S
fns.append(sub_RR)
fs.append(np.mean(fns))
p = np.polyfit(np.log(s),np.log(fs),1)
return p
def HurstMF_DFA(X,m,q,smin=4,step=1):
X = np.array(X)
Fluct=np.cumsum(X-np.mean(X))#/np.std(X)
#print Fluct
FluctRev=np.flipud(Fluct)
#print FluctRev
N=len(Fluct)
#ScaleNumb=np.linspace(np.log2(4),np.log2(N),40)
#s=np.round(np.power(2,ScaleNumb))
s=range(smin,int(np.floor(N/2)),step)
if not len(s):
print "the sub step is invalid"
return
#print s
#s = [2,3,5,6,7,8,9,10,11,12,13,14,15,16,17,18,20]
fqs = []
for step in s:
ns = np.floor(N/step)
fs = []
fsvar = []
fsvarr = []
for i in range(int(ns)):
subFluct = Fluct[step*i:step*(i+1)]
subFluctRev = FluctRev[step*i:step*(i+1)]
subSeg = range(int(step)*i+1,int(step)*(i+1)+1)
#poly fit
poly = np.polyfit(subSeg,subFluct,m)
polyr = np.polyfit(subSeg,subFluctRev,m)
#poly val
fit = np.polyval(poly,subSeg)
fitr = np.polyval(polyr,subSeg)
#
var = np.mean(np.power(subFluct -fit, 2))
var = np.power(var,q/2.)
fsvar.append(var)
varr = np.mean(np.power(subFluctRev -fitr, 2))
varr = np.power(varr,q/2.)
fsvarr.append(varr)
fs.append(var + varr)
value = np.power(np.sum(fs)/(2.*ns),1./q)
fqs.append(value)
p = np.polyfit(np.log(s),np.log(fqs),m)
return p
测试20次
hrs=[]
hh=[]
testnum = 20
for i in range(testnum):
dataarray = pd.Series(np.random.randint(1,10,100))
#for i in range(120):
# dataarray.append(i-np.random.randn(1)*i/2)
a = HurstFunc.HurstMF_DFA(dataarray,1,2)
hmf.append(a[0])
b = HurstFunc.HurstRS(dataarray)
hrs.append(b[0])
#c = hurst(dataarray)
#hh.append(c)
print np.mean(hmf)
print np.mean([x for x in hrs if str(x) != 'nan' and str(x)!= '-inf'])
plt.plot(range(testnum),hmf,label='mf')
plt.plot(range(testnum),hrs,label='rs')
#plt.plot(range(testnum),hh,label='h')
plt.legend(loc = 'upper left')
plt.show()
![捕获.PNG][1]
[1]: https://joinquant-image.b0.upaiyun.com/f067b51a0c9d7d28ce18ba63fc9f9438