def load_data(x):
corpus_chars=x.replace("\n"," ")
idx_to_char=list(set(corpus_chars))
char_to_idx=dict([(char,i) for i,char in enumerate(idx_to_char)])
vocab_size=len(char_to_idx)
corpus_indices=[char_to_idx.get(char) for char in corpus_chars]
return corpus_indices, char_to_idx, idx_to_char, vocab_size
def init_rnn_state(batch_size=batch_size,num_hiddens=num_hiddens):
return t.zeros(batch_size,num_hiddens)
def data_iter_random(corpus_indices,batch_size,num_steps):
num_examples=(len(corpus_indices)-1)//num_steps
epoch_size=num_examples//batch_size
example_indices=list(range(num_examples))
random.shuffle(example_indices)
def _data(pos):
return corpus_indices[pos:pos+num_steps]
for i in range(epoch_size):
i=i*batch_size
batch_indices=example_indices[i:i+batch_size]
x=[_data(j*num_steps) for j in batch_indices]
y=[_data(j*num_steps+1) for j in batch_indices]
yield t.Tensor(x),t.Tensor(y)
def get_params(num_inputs=vocab_size,num_hiddens=num_hiddens,num_outputs=vocab_size):
def _one(shape):
return V(t.randn(size=shape),requires_grad=True)
w_xh=_one((num_inputs,num_hiddens))
w_hh=_one((num_hiddens,num_hiddens))
b_h=V(t.zeros(num_hiddens),requires_grad=True)
w_hq=_one((num_hiddens,num_outputs))
b_q=V(t.zeros(num_outputs),requires_grad=True)
return [w_xh,w_hh,b_h,w_hq,b_q]
def rnn(inputs,state,params):
w_xh,w_hh,b_h,w_hq,b_q=get_params()
h=state
outputs=[]
for x in inputs:
h=t.tanh(t.mm(x,w_xh)+t.mm(h,w_hh)+b_h)
y=t.mm(h,w_hq)+b_q
outputs.append(y)
return outputs,h
def onehot(x,n_class):
x=x.long()
res=t.zeros(x.shape[0],n_class)
res.scatter_(1,x.view(-1,1).long(),1)
return res
def to_onehot(x,n_class):
return [onehot(x[:,i],n_class) for i in range(x.shape[1])]
def grad_clipping(params,theta):
norm=t.Tensor([0])
for param in params:
norm+=(param.grad**2).sum()
norm=norm.sqrt().item()
if theta<norm:
for param in params:
param.grad=param.grad*theta/norm
def sgd(params,lr,batch_size):
for param in params:
param.data-=lr*param.grad/batch_size
def predict_rnn(prefix,num_chars,rnn,params,init_rnn_state,num_hiddens,vocab_size,idx_to_char,char_to_idx):
for t in range(num_chars+len(prefix)-1):
state=init_rnn_state(1,num_hiddens)
outputs=[char[prefix[0]]]
y,state=rnn(to_onehot(outputs[-1],vocab_size),state,params)
if t<len(prefix)-1:
outputs.append(char_to_idx[prefix[i+1]])
else:
outputs.append(char_to_idx[y[0].argmax(dim=1).item()])
return "".join(idx_to_char(i) for i in outputs)
def train_and_predict_rnn(rnn,get_params,init_rnn_state,num_hiddens,vocab_size,corpus_indices,idx_to_char,char_to_idx,num_epochs,num_steps,lr,clipping_theta,batch_size,pred_period,pred_len,prefixes):
params=get_params()
loss=t.nn.CrossEntropyLoss()
for epoch in range(num_epochs):
for x,y in data_iter_random(corpus_indices,batch_size,num_steps):
state=init_rnn_state()
inputs=to_onehot(x,vocab_size)
outputs,state=rnn(inputs,state,params)
outputs=t.cat(outputs,0)
y=t.transpose(y,0,1).contiguous().view(-1)
l=loss(outputs,y.long())
if params[0].grad is not None:
for param in params:
param.grad.data.zero_()
l.backward()
grad_clipping(params,clipping_theta)
sgd(params,lr,batch_size)
l_sum+=l.item()*y.shape[0]
n+=y.shape[0]
if (epoch+1)%pred_period==0:
for prefix in prefixes:
print("epoch",epoch+1,predict_rnn(prefix,pred_len,rnn,params,init_rnn_state,num_hiddens,vocab_size,idx_to_char,char_to_idx))
corpus_indices, char_to_idx, idx_to_char, vocab_size=load_data(data)
num_epochs,num_steps,batch_size,lr,clipping_theta=250,35,32,0.1,0.01
pred_period,pred_len,prefixes=50,50,["分开","不分开"]
num_inputs=vocab_size
num_hiddens=256
num_outputs=vocab_size
state=init_rnn_state()
train_and_predict_rnn(rnn,get_params,init_rnn_state,num_hiddens,vocab_size,corpus_indices,idx_to_char,char_to_idx,num_epochs,num_steps,lr,clipping_theta,batch_size,pred_period,pred_len,prefixes)