/** *//**
* http://www.5a520.cn http://www.bt285.cn
*/
package instream;
import java.net.URL;
import java.net.URLDecoder;
import decode.Header;
import tag.MP3Tag;
import tag.TagThread;
public final class BuffRandAcceURL implements IRandomAccess, IWriterCallBack {
public static final int UNIT_LENGTH_BITS = 15; //32K
public static final int UNIT_LENGTH = 1 << UNIT_LENGTH_BITS;
public static final int BUF_LENGTH = UNIT_LENGTH << 4; //16块
public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS;
public static final int BUF_LENGTH_MASK = (BUF_LENGTH - 1);
private static final int MAX_WRITER = 8;
private static long file_pointer;
private static int read_pos;
private static int fill_bytes;
private static byte[] buf; //同时也作读写同步锁:buf.wait()/buf.notify()
private static int[] buf_bytes;
private static int buf_index;
private static int alloc_pos;
private static URL url = null;
private static boolean isalive = true;
private static int writer_count;
private static int await_count;
private long file_length;
private long frame_bytes;
public BuffRandAcceURL(String sURL) throws Exception {
this(sURL,MAX_WRITER);
}
public BuffRandAcceURL(String sURL, int download_threads) throws Exception {
buf = new byte[BUF_LENGTH];
buf_bytes = new int[UNIT_COUNT];
url = new URL(sURL);
//创建线程以异步方式解析ID3
new TagThread(url);
//打印当前文件名
try {
String s = URLDecoder.decode(sURL, "GBK");
System.out.println("start>> " + s.substring(s.lastIndexOf("/") + 1));
s = null;
} catch (Exception e) {
System.out.println("start>> " + sURL);
}
//创建"写"线程
for(int i = 0; i < download_threads; i++)
new Writer(this, url, buf, i+1);
frame_bytes = file_length = HttpReader.getContentLength();
if(file_length == 0) {
Header.strLastErr = "连接URL出错,重试 " + HttpReader.MAX_RETRY + " 次后放弃。";
throw new Exception("retry " + HttpReader.MAX_RETRY);
}
writer_count = download_threads;
//缓冲
try_cache();
//跳过ID3 v2
MP3Tag mP3Tag = new MP3Tag();
int v2_size = mP3Tag.checkID3V2(buf,0);
if (v2_size > 0) {
frame_bytes -= v2_size;
//seek(v2_size):
fill_bytes -= v2_size;
file_pointer = v2_size;
read_pos = v2_size;
read_pos &= BUF_LENGTH_MASK;
int units = v2_size >> UNIT_LENGTH_BITS;
for(int i = 0; i < units; i++) {
buf_bytes[i] = 0;
this.notifyWriter();
}
buf_bytes[units] -= v2_size;
this.notifyWriter();
}
mP3Tag = null;
}
private void try_cache() throws InterruptedException {
int cache_size = BUF_LENGTH;
if(cache_size > (int)file_length - alloc_pos)
cache_size = (int)file_length - alloc_pos;
cache_size -= UNIT_LENGTH;
//等待填写当前正在读的那"一块"缓冲区
/**//*if(fill_bytes >= cache_size && writer_count > 0) {
synchronized (buf) {
buf.wait();
}
return;
}*/
//等待填满缓冲区
while (fill_bytes < cache_size) {
if (writer_count == 0 || isalive == false)
return;
if(BUF_LENGTH > (int)file_length - alloc_pos)
cache_size = (int)file_length - alloc_pos - UNIT_LENGTH;
System.out.printf("\r[缓冲%1$6.2f%%] ",(float)fill_bytes / cache_size * 100);
synchronized (buf) {
buf.wait();
}
}
System.out.printf("\r");
}
private int try_reading(int i, int len) throws Exception {
int n = (i == UNIT_COUNT - 1) ? 0 : (i + 1);
int r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]);
while (r < len) {
if (writer_count == 0 || isalive == false)
return r;
try_cache();
r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]);
}
return len;
}
/**//*
* 各个"写"线程互斥等待空闲块
*/
public synchronized boolean tryWriting(Writer w) throws InterruptedException {
await_count++;
while (buf_bytes[buf_index] != 0 && isalive) {
this.wait();
}
//下载速度足够就结束一个"写"线程
if(writer_count > 1 && w.await_count >= await_count &&
w.await_count >= writer_count)
return false;
if(alloc_pos >= file_length)
return false;
w.await_count = await_count;
await_count--;
w.start_pos = alloc_pos;
w.index = buf_index;
alloc_pos += UNIT_LENGTH;
buf_index = (buf_index == UNIT_COUNT - 1) ? 0 : buf_index + 1;
return isalive;
}
public void updateBuffer(int i, int len) {
synchronized (buf) {
buf_bytes[i] = len;
fill_bytes += len;
buf.notify();
}
}
public void updateWriterCount() {
synchronized (buf) {
writer_count--;
buf.notify();
}
}
public synchronized void notifyWriter() {
this.notifyAll();
}
public void terminateWriters() {
synchronized (buf) {
if (isalive) {
isalive = false;
Header.strLastErr = "读取文件超时。重试 " + HttpReader.MAX_RETRY
+ " 次后放弃,请您稍后再试。";
}
buf.notify();
}
notifyWriter();
}
public int read() throws Exception {
int iret = -1;
int i = read_pos >> UNIT_LENGTH_BITS;
// 1."等待"有1字节可读
while (buf_bytes[i] < 1) {
try_cache();
if (writer_count == 0)
return -1;
}
if(isalive == false)
return -1;
// 2.读取
iret = buf[read_pos] & 0xff;
fill_bytes--;
file_pointer++;
read_pos++;
read_pos &= BUF_LENGTH_MASK;
if (--buf_bytes[i] == 0)
notifyWriter(); // 3.通知
return iret;
}
public int read(byte b[]) throws Exception {
return read(b, 0, b.length);
}
public int read(byte[] b, int off, int len) throws Exception {
if(len > UNIT_LENGTH)
len = UNIT_LENGTH;
int i = read_pos >> UNIT_LENGTH_BITS;
// 1."等待"有足够内容可读
if(try_reading(i, len) < len || isalive == false)
return -1;
// 2.读取
int tail_len = BUF_LENGTH - read_pos; // write_pos != BUF_LENGTH
if (tail_len < len) {
System.arraycopy(buf, read_pos, b, off, tail_len);
System.arraycopy(buf, 0, b, off + tail_len, len - tail_len);
} else
System.arraycopy(buf, read_pos, b, off, len);
fill_bytes -= len;
file_pointer += len;
read_pos += len;
read_pos &= BUF_LENGTH_MASK;
buf_bytes[i] -= len;
if (buf_bytes[i] < 0) {
int ni = read_pos >> UNIT_LENGTH_BITS;
buf_bytes[ni] += buf_bytes[i];
buf_bytes[i] = 0;
notifyWriter();
} else if (buf_bytes[i] == 0)
notifyWriter();
return len;
}
/**//*
* 从src_off位置复制,不移动文件"指针"
*/
public int dump(int src_off, byte b[], int dst_off, int len) throws Exception {
int rpos = read_pos + src_off;
if(try_reading(rpos >> UNIT_LENGTH_BITS, len) < len || isalive == false)
return -1;
int tail_len = BUF_LENGTH - rpos;
if (tail_len < len) {
System.arraycopy(buf, rpos, b, dst_off, tail_len);
System.arraycopy(buf, 0, b, dst_off + tail_len, len - tail_len);
} else
System.arraycopy(buf, rpos, b, dst_off, len);
// 不发信号
return len;
}
public long length() {
return file_length;
}
public long getFilePointer() {
return file_pointer;
}
public void close() {
//
}
//
public void seek(long pos) throws Exception {
//
}
}
|